Estoy explorando el comportamiento de Spark al unir una tabla consigo misma. Estoy usando Databricks.
Mi escenario ficticio es:
Leer una tabla externa como marco de datos A (los archivos subyacentes están en formato delta)
Defina el marco de datos B como el marco de datos A con solo ciertas columnas seleccionadas
Unir los marcos de datos A y B en la columna 1 y la columna 2
(Sí, no tiene mucho sentido, solo estoy experimentando para entender la mecánica subyacente de Spark)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Mi primer intento fue ejecutar el código tal como está (intento 1). Luego intenté repartir y almacenar en caché (intento 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Finalmente, particioné, clasifiqué y almacené en caché
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Los dags respectivos generados están como adjuntos.
Mis preguntas son:
¿Por qué en el intento 1 la tabla parece estar en caché aunque el almacenamiento en caché no se haya especificado explícitamente?
Por qué InMemoreTableScan siempre es seguido por otro nodo de este tipo.
¿Por qué en el intento 3 el almacenamiento en caché parece tener lugar en dos etapas?
Por qué en el intento 3 WholeStageCodegen sigue a uno (y solo uno) InMemoreTableScan.
Respuestas:
Lo que observa en estos 3 planes es una mezcla de tiempo de ejecución de DataBricks y Spark.
En primer lugar, mientras se ejecuta DataBricks runtime 3.3+, el almacenamiento en caché se habilita automáticamente para todos los archivos de parquet. Configuración correspondiente para eso:
spark.databricks.io.cache.enabled true
Para su segunda consulta, InMemoryTableScan ocurre dos veces porque justo cuando se llamó a join, spark intentó calcular el conjunto de datos A y el conjunto de datos B en paralelo. Suponiendo que a los diferentes ejecutores se les asignaron las tareas anteriores, ambos deberán escanear la tabla desde el caché (DataBricks).
Para el tercero, InMemoryTableScan no se refiere al almacenamiento en caché en sí mismo. Simplemente significa que cualquier plan de catalizador formado implicaba escanear la tabla en caché varias veces.
PD: no puedo visualizar el punto 4 :)
fuente