La mayoría de las operaciones RDD son perezosas. Piense en un RDD como una descripción de una serie de operaciones. Un RDD no son datos. Entonces esta línea:
val textFile = sc.textFile("/user/emp.txt")
No hace nada Crea un RDD que dice "necesitaremos cargar este archivo". El archivo no está cargado en este momento.
Las operaciones RDD que requieren observar el contenido de los datos no pueden ser perezosas. (Estas se denominan acciones ). Un ejemplo es RDD.count
: para indicarle el número de líneas en el archivo, el archivo debe leerse. Entonces, si escribe textFile.count
, en este punto se leerá el archivo, se contarán las líneas y se devolverá el recuento.
¿Qué pasa si vuelves a llamar textFile.count
? Lo mismo: el archivo será leído y contado nuevamente. Nada es almacenado. Un RDD no son datos.
Entonces, ¿qué hace RDD.cache
? Si agrega textFile.cache
al código anterior:
val textFile = sc.textFile("/user/emp.txt")
textFile.cache
No hace nada RDD.cache
También es una operación perezosa. El archivo aún no se lee. Pero ahora el RDD dice "lee este archivo y luego almacena en caché el contenido". Si luego ejecuta textFile.count
la primera vez, el archivo se cargará, se almacenará en caché y se contará. Si llama textFile.count
por segunda vez, la operación usará el caché. Solo tomará los datos del caché y contará las líneas.
El comportamiento del caché depende de la memoria disponible. Si el archivo no cabe en la memoria, por ejemplo, textFile.count
volverá al comportamiento habitual y volverá a leer el archivo.
perisist
y elegir una opción de almacenamiento que permita derramar los datos de la caché al disco.Creo que la pregunta se formularía mejor como:
¿Cuándo necesitamos llamar al caché o persistir en un RDD?
Los procesos de chispa son flojos, es decir, no pasará nada hasta que se requiera. Para responder rápidamente a la pregunta, después de
val textFile = sc.textFile("/user/emp.txt")
emitir, no sucede nada con los datos, soloHadoopRDD
se construye un, utilizando el archivo como fuente.Digamos que transformamos esos datos un poco:
Nuevamente, nada le sucede a los datos. Ahora hay un nuevo RDD
wordsRDD
que contiene una referenciatestFile
y una función que se aplicará cuando sea necesario.Solo cuando se ejecuta una acción sobre un RDD, como
wordsRDD.count
, se ejecutará la cadena RDD, llamada linaje . Es decir, los datos, desglosados en particiones, serán cargados por los ejecutores del clúster Spark,flatMap
se aplicará la función y se calculará el resultado.En un linaje lineal, como el de este ejemplo,
cache()
no es necesario. Los datos se cargarán a los ejecutores, se aplicarán todas las transformaciones y finalmentecount
se computarán, todo en la memoria, si los datos se ajustan en la memoria.cache
es útil cuando el linaje del RDD se ramifica. Digamos que desea filtrar las palabras del ejemplo anterior en un recuento de palabras positivas y negativas. Podrías hacer esto así:Aquí, cada rama emite una recarga de los datos. Agregar una
cache
declaración explícita asegurará que el procesamiento realizado previamente se conserve y reutilice. El trabajo se verá así:Por esa razón,
cache
se dice que 'rompe el linaje' ya que crea un punto de control que puede reutilizarse para su posterior procesamiento.Regla de oro: se usa
cache
cuando el linaje de su RDD se ramifica o cuando un RDD se usa varias veces como en un bucle.fuente
spark.storage.memoryFraction
. Con respecto a qué ejecutor tiene qué datos, un RDD realizará un seguimiento de sus particiones que se distribuyen en los ejecutores.cache
tampocopersist
puedo romper el linaje .¿Necesitamos llamar "caché" o "persistir" explícitamente para almacenar los datos RDD en la memoria?
Sí, solo si es necesario.
¿Los datos RDD almacenados de forma distribuida en la memoria por defecto?
¡No!
Y estas son las razones por las cuales:
Spark admite dos tipos de variables compartidas: variables de difusión, que se pueden usar para almacenar en caché un valor en la memoria en todos los nodos, y acumuladores, que son variables que solo se "agregan", como contadores y sumas.
Los RDD admiten dos tipos de operaciones: transformaciones, que crean un nuevo conjunto de datos a partir de uno existente, y acciones, que devuelven un valor al programa del controlador después de ejecutar un cálculo en el conjunto de datos. Por ejemplo, el mapa es una transformación que pasa cada elemento del conjunto de datos a través de una función y devuelve un nuevo RDD que representa los resultados. Por otro lado, reducir es una acción que agrega todos los elementos del RDD usando alguna función y devuelve el resultado final al programa del controlador (aunque también hay un reduceByKey paralelo que devuelve un conjunto de datos distribuido).
Todas las transformaciones en Spark son perezosas, ya que no calculan sus resultados de inmediato. En cambio, solo recuerdan las transformaciones aplicadas a algún conjunto de datos base (por ejemplo, un archivo). Las transformaciones solo se calculan cuando una acción requiere que se devuelva un resultado al programa del controlador. Este diseño permite que Spark se ejecute de manera más eficiente; por ejemplo, podemos darnos cuenta de que un conjunto de datos creado a través del mapa se usará en una reducción y devolverá solo el resultado de la reducción al controlador, en lugar del conjunto de datos mapeado más grande.
Por defecto, cada RDD transformado puede ser recalculado cada vez que ejecuta una acción en él. Sin embargo, también puede conservar un RDD en la memoria utilizando el método de persistencia (o caché), en cuyo caso Spark mantendrá los elementos en el clúster para un acceso mucho más rápido la próxima vez que lo consulte. También hay soporte para RDDs persistentes en disco, o replicados en múltiples nodos.
Para más detalles, consulte la guía de programación de Spark .
fuente
A continuación se muestran las tres situaciones en las que debe almacenar en caché sus RDD:
fuente
Agregar otra razón para agregar (o agregar temporalmente)
cache
llamada al método.para problemas de memoria de depuración
con el
cache
método, spark proporcionará información de depuración sobre el tamaño del RDD. así que en la interfaz de usuario integrada de spark, obtendrá información de consumo de memoria RDD. y esto resultó muy útil para diagnosticar problemas de memoria.fuente