(¿Por qué) necesitamos llamar a la caché o persistir en un RDD

171

Cuando se crea un conjunto de datos distribuido elástico (RDD) a partir de un archivo de texto o colección (o de otro RDD), ¿necesitamos llamar "caché" o "persistir" explícitamente para almacenar los datos RDD en la memoria? ¿O los datos RDD se almacenan de forma distribuida en la memoria de forma predeterminada?

val textFile = sc.textFile("/user/emp.txt")

Según tengo entendido, después del paso anterior, textFile es un RDD y está disponible en toda / parte de la memoria del nodo.

Si es así, ¿por qué necesitamos llamar "caché" o "persistir" en textFile RDD entonces?

Ramana
fuente

Respuestas:

300

La mayoría de las operaciones RDD son perezosas. Piense en un RDD como una descripción de una serie de operaciones. Un RDD no son datos. Entonces esta línea:

val textFile = sc.textFile("/user/emp.txt")

No hace nada Crea un RDD que dice "necesitaremos cargar este archivo". El archivo no está cargado en este momento.

Las operaciones RDD que requieren observar el contenido de los datos no pueden ser perezosas. (Estas se denominan acciones ). Un ejemplo es RDD.count: para indicarle el número de líneas en el archivo, el archivo debe leerse. Entonces, si escribe textFile.count, en este punto se leerá el archivo, se contarán las líneas y se devolverá el recuento.

¿Qué pasa si vuelves a llamar textFile.count? Lo mismo: el archivo será leído y contado nuevamente. Nada es almacenado. Un RDD no son datos.

Entonces, ¿qué hace RDD.cache? Si agrega textFile.cacheal código anterior:

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

No hace nada RDD.cacheTambién es una operación perezosa. El archivo aún no se lee. Pero ahora el RDD dice "lee este archivo y luego almacena en caché el contenido". Si luego ejecuta textFile.countla primera vez, el archivo se cargará, se almacenará en caché y se contará. Si llama textFile.countpor segunda vez, la operación usará el caché. Solo tomará los datos del caché y contará las líneas.

El comportamiento del caché depende de la memoria disponible. Si el archivo no cabe en la memoria, por ejemplo, textFile.countvolverá al comportamiento habitual y volverá a leer el archivo.

Daniel Darabos
fuente
44
Hola, Daniel: cuando llamas a la memoria caché, ¿significa esto que el RDD no se vuelve a cargar desde la fuente (por ejemplo, un archivo de texto), ¿cómo puedes estar seguro de que los datos del archivo de texto son más recientes cuando se almacenan en caché? (¿chispa se da cuenta de esto o es una operación manual quitar la vista previa () periódicamente para garantizar que los datos de origen se vuelvan a calcular más adelante en el linaje?)
andrew.butkus
Además, si debe cancelar la recuperación periódica, si tiene un rdd que se almacena en caché, que depende de otro RDD que se almacena en caché, ¿debe cancelar la recuperación de ambos RDD para ver resultados recalculados?
andrew.butkus
21
Spark simplemente asume que el archivo nunca cambiará. Lee el archivo en un momento arbitrario y puede volver a leer partes de él según sea necesario más adelante. (Por ejemplo, si una parte de los datos se expulsó de la memoria caché). ¡Así que es mejor que mantenga sus archivos sin cambios! Simplemente cree un nuevo archivo con un nuevo nombre cuando tenga nuevos datos, luego cárguelo como un nuevo RDD. Si recibe continuamente nuevos datos, busque en Spark Streaming.
Daniel Darabos
10
Si. Los RDD son inmutables, por lo que cada RDD asume que sus dependencias también son inmutables. Spark Streaming le permite configurar tales árboles que operan en una secuencia de cambios. Pero una solución aún más simple es construir el árbol en una función que tome un nombre de archivo como parámetro. Luego simplemente llame a la función para el nuevo archivo y poof, ya tiene el nuevo árbol de cómputo.
Daniel Darabos
1
@Humoyun: en la pestaña Almacenamiento de la interfaz de usuario de Spark, puede ver qué cantidad de cada RDD se almacena en caché. Los datos pueden ser tan grandes que solo el 40% de ellos se ajusta en la memoria total que tiene para el almacenamiento en caché. Una opción en este caso es usar perisisty elegir una opción de almacenamiento que permita derramar los datos de la caché al disco.
Daniel Darabos
197

Creo que la pregunta se formularía mejor como:

¿Cuándo necesitamos llamar al caché o persistir en un RDD?

Los procesos de chispa son flojos, es decir, no pasará nada hasta que se requiera. Para responder rápidamente a la pregunta, después de val textFile = sc.textFile("/user/emp.txt")emitir, no sucede nada con los datos, solo HadoopRDDse construye un, utilizando el archivo como fuente.

Digamos que transformamos esos datos un poco:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

Nuevamente, nada le sucede a los datos. Ahora hay un nuevo RDD wordsRDDque contiene una referencia testFiley una función que se aplicará cuando sea necesario.

Solo cuando se ejecuta una acción sobre un RDD, como wordsRDD.count, se ejecutará la cadena RDD, llamada linaje . Es decir, los datos, desglosados ​​en particiones, serán cargados por los ejecutores del clúster Spark, flatMapse aplicará la función y se calculará el resultado.

En un linaje lineal, como el de este ejemplo, cache()no es necesario. Los datos se cargarán a los ejecutores, se aplicarán todas las transformaciones y finalmente countse computarán, todo en la memoria, si los datos se ajustan en la memoria.

cachees útil cuando el linaje del RDD se ramifica. Digamos que desea filtrar las palabras del ejemplo anterior en un recuento de palabras positivas y negativas. Podrías hacer esto así:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Aquí, cada rama emite una recarga de los datos. Agregar una cachedeclaración explícita asegurará que el procesamiento realizado previamente se conserve y reutilice. El trabajo se verá así:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Por esa razón, cachese dice que 'rompe el linaje' ya que crea un punto de control que puede reutilizarse para su posterior procesamiento.

Regla de oro: se usa cachecuando el linaje de su RDD se ramifica o cuando un RDD se usa varias veces como en un bucle.

maasg
fuente
1
Increíble. Gracias. Una pregunta más relacionada. Cuando almacenamos en caché o persistimos, los datos se almacenarán en la memoria del ejecutor o en la memoria del nodo de trabajo. Si es la memoria del ejecutor, Cómo Spark identifica qué ejecutor tiene los datos.
Ramana
1
@RamanaUppala se usa la memoria del ejecutor. La fracción de la memoria del ejecutor utilizada para el almacenamiento en caché está controlada por la configuración spark.storage.memoryFraction. Con respecto a qué ejecutor tiene qué datos, un RDD realizará un seguimiento de sus particiones que se distribuyen en los ejecutores.
maasg
55
@maasg Corrígeme si estoy equivocado pero cachetampoco persist puedo romper el linaje .
cero323
¿Dónde se almacenarían las palabras RDD si no hemos tenido la instrucción .cache () en el ejemplo anterior?
sun_dare
¿Qué pasa si antes de las dos cuentas, unimos las dos ramas de nuevo a una rdd y contamos? En este caso, ¿es beneficioso el caché?
Xiawei Zhang el
30

¿Necesitamos llamar "caché" o "persistir" explícitamente para almacenar los datos RDD en la memoria?

Sí, solo si es necesario.

¿Los datos RDD almacenados de forma distribuida en la memoria por defecto?

¡No!

Y estas son las razones por las cuales:

  • Spark admite dos tipos de variables compartidas: variables de difusión, que se pueden usar para almacenar en caché un valor en la memoria en todos los nodos, y acumuladores, que son variables que solo se "agregan", como contadores y sumas.

  • Los RDD admiten dos tipos de operaciones: transformaciones, que crean un nuevo conjunto de datos a partir de uno existente, y acciones, que devuelven un valor al programa del controlador después de ejecutar un cálculo en el conjunto de datos. Por ejemplo, el mapa es una transformación que pasa cada elemento del conjunto de datos a través de una función y devuelve un nuevo RDD que representa los resultados. Por otro lado, reducir es una acción que agrega todos los elementos del RDD usando alguna función y devuelve el resultado final al programa del controlador (aunque también hay un reduceByKey paralelo que devuelve un conjunto de datos distribuido).

  • Todas las transformaciones en Spark son perezosas, ya que no calculan sus resultados de inmediato. En cambio, solo recuerdan las transformaciones aplicadas a algún conjunto de datos base (por ejemplo, un archivo). Las transformaciones solo se calculan cuando una acción requiere que se devuelva un resultado al programa del controlador. Este diseño permite que Spark se ejecute de manera más eficiente; por ejemplo, podemos darnos cuenta de que un conjunto de datos creado a través del mapa se usará en una reducción y devolverá solo el resultado de la reducción al controlador, en lugar del conjunto de datos mapeado más grande.

  • Por defecto, cada RDD transformado puede ser recalculado cada vez que ejecuta una acción en él. Sin embargo, también puede conservar un RDD en la memoria utilizando el método de persistencia (o caché), en cuyo caso Spark mantendrá los elementos en el clúster para un acceso mucho más rápido la próxima vez que lo consulte. También hay soporte para RDDs persistentes en disco, o replicados en múltiples nodos.

Para más detalles, consulte la guía de programación de Spark .

eliasah
fuente
1
Eso no respondió mi pregunta.
Ramana
¿Qué no responde?
eliasah
1
cuando los datos de RDD se almacenan en la memoria predeterminada, ¿por qué necesitamos llamar a Cache o Persist?
Ramana
Los RDD no se almacenan en la memoria de forma predeterminada, por lo que persistir en el RDD hace que Spark realice la transformación más rápido en el clúster
eliasah
2
Es una buena respuesta, no sé por qué fue rechazado. Es una respuesta de arriba hacia abajo, que explica cómo funcionan los RDD desde los conceptos de alto nivel. He agregado otra respuesta que va de abajo hacia arriba: a partir de "qué hace esta línea". Tal vez sea más fácil de seguir para alguien que acaba de comenzar con Spark.
Daniel Darabos
11

A continuación se muestran las tres situaciones en las que debe almacenar en caché sus RDD:

usando un RDD muchas veces

realizar múltiples acciones en el mismo RDD

para largas cadenas de transformaciones (o muy caras)

rileyss
fuente
7

Agregar otra razón para agregar (o agregar temporalmente) cachellamada al método.

para problemas de memoria de depuración

con el cachemétodo, spark proporcionará información de depuración sobre el tamaño del RDD. así que en la interfaz de usuario integrada de spark, obtendrá información de consumo de memoria RDD. y esto resultó muy útil para diagnosticar problemas de memoria.

zinc
fuente