Apache Spark: mapa vs mapa ¿Particiones?

133

¿Cuál es la diferencia entre un RDD map y un mapPartitionsmétodo? ¿Y se flatMapcomporta como mapo como mapPartitions? Gracias.

(editar) es decir, ¿cuál es la diferencia (semánticamente o en términos de ejecución) entre

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

Y:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Nicholas White
fuente
3
Después de leer la respuesta a continuación, puede echar un vistazo a [esta experiencia] compartida por alguien que realmente la usó. ( Bzhangusc.wordpress.com/2014/06/19/… ) bzhangusc.wordpress.com/2014/06/19 / ...
Abhidemon

Respuestas:

121

¿Cuál es la diferencia entre el mapa de un RDD y el método mapPartitions?

El mapa de métodos convierte cada elemento del RDD de origen en un único elemento del RDD resultante mediante la aplicación de una función. mapPartitions convierte cada partición del RDD de origen en múltiples elementos del resultado (posiblemente ninguno).

¿Y flatMap se comporta como map o mapPartitions?

Tampoco, flatMap funciona en un solo elemento (as map) y produce múltiples elementos del resultado (as mapPartitions).

Alexey Romanov
fuente
3
Gracias, ¿el mapa causa barajaduras (o cambia el número de particiones)? ¿Mueve datos entre nodos? He estado usando mapPartitions para evitar mover datos entre nodos, pero no estaba seguro de si flapMap lo haría.
Nicholas White
Si nos fijamos en la fuente - github.com/apache/incubator-spark/blob/... y github.com/apache/incubator-spark/blob/... - tanto mapy flatMaptienen exactamente las mismas particiones como el padre.
Alexey Romanov
13
Como nota, una presentación provista por un orador en la Cumbre de San Francisco Spark 2013 (goo.gl/JZXDCR) destaca que las tareas con una sobrecarga alta por registro se desempeñan mejor con una mapPartition que con una transformación de mapa. Según la presentación, esto se debe al alto costo de configurar una nueva tarea.
Mikel Urkia
1
Estoy viendo lo contrario: incluso con operaciones muy pequeñas, es más rápido llamar a mapPartitions e iterar que llamar a map. Supongo que esto es solo la sobrecarga de iniciar el motor de lenguaje que procesará la tarea de mapa. (Estoy en R, lo que puede tener más gastos generales de inicio). Si realizaría varias operaciones, mapPartitions parece ser un poco más rápido; supongo que esto se debe a que lee el RDD solo una vez. Incluso si el RDD está almacenado en caché en la RAM, eso ahorra muchos gastos generales de la conversión de tipos.
Bob
3
mapbásicamente toma su función fy la pasa a iter.map(f). Básicamente, es un método de conveniencia que se envuelve mapPartitions. Me sorprendería si hubiera una ventaja de rendimiento en ambos sentidos para un trabajo de transformación de estilo de mapa puro (es decir, donde la función es idéntica), si necesita crear algunos objetos para el procesamiento, si estos objetos se pueden compartir, mapPartitionssería ventajoso.
NightWolf
129

Diablillo. PROPINA :

Siempre que tenga una inicialización de peso pesado que se debe hacer una vez para muchos RDDelementos en lugar de una vez por RDDelemento, y si esta inicialización, como la creación de objetos de una biblioteca de terceros, no se puede serializar (para que Spark pueda transmitirla a través del clúster a los nodos de trabajo), use en mapPartitions()lugar de map(). mapPartitions()prevé que la inicialización se realice una vez por tarea de trabajo / hilo / partición en lugar de una vez por RDDelemento de datos, por ejemplo: ver más abajo

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2 se flatMapcomporta como mapa o como mapPartitions?

Si. vea el ejemplo 2 de flatmap... se explica por sí mismo.

Q1. ¿Cuál es la diferencia entre un RDD mapymapPartitions

mapfunciona la función que se utiliza en un nivel por elemento, mientras que mapPartitionsejerce la función en el nivel de partición.

Escenario de ejemplo : si tenemos 100K elementos en unaRDDparticiónparticular,activaremos la función que está siendo utilizada por la transformación de mapeo 100K veces cuando la usemosmap.

Por el contrario, si usamos mapPartitions, solo llamaremos a la función particular una vez, pero pasaremos todos los registros de 100K y recuperaremos todas las respuestas en una llamada de función.

Habrá aumento de rendimiento ya que mapfunciona en una función en particular tantas veces, especialmente si la función está haciendo algo costoso cada vez que no tendría que hacerlo si pasamos todos los elementos a la vez (en caso de mappartitions).

mapa

Aplica una función de transformación en cada elemento del RDD y devuelve el resultado como un nuevo RDD.

Listado de variantes

mapa de definición [U: ClassTag] (f: T => U): RDD [U]

Ejemplo:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Este es un mapa especializado que se llama solo una vez para cada partición. El contenido completo de las particiones respectivas está disponible como una secuencia secuencial de valores a través del argumento de entrada (Iterarator [T]). La función personalizada debe devolver otro iterador [U]. Los iteradores de resultados combinados se convierten automáticamente en un nuevo RDD. Tenga en cuenta que las tuplas (3,4) y (6,7) faltan en el siguiente resultado debido a la partición que elegimos.

preservesPartitioningindica si la función de entrada conserva el particionador, lo que debería ser a falsemenos que sea un par RDD y la función de entrada no modifique las teclas.

Listado de variantes

def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], conservaPartitions: Boolean = false): RDD [U]

Ejemplo 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Ejemplo 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

El programa anterior también se puede escribir usando flatMap de la siguiente manera.

Ejemplo 2 usando mapa plano

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Conclusión

mapPartitionsla transformación es más rápida que mapya que llama a su función una vez / partición, no una vez / elemento ...

Lecturas adicionales: foreach Vs foreachPartitions Cuándo usar ¿Qué?

Ram Ghadiyaram
fuente
44
Sé que puedes usar mapo mapPartitionslograr el mismo resultado (ver los dos ejemplos en la pregunta); Esta pregunta es acerca de por qué elegirías un camino sobre el otro. ¡Los comentarios en la otra respuesta son realmente útiles! Además, no mencionaste eso mapy flatMappasaste falsea preservesPartitioning, y cuáles son las implicaciones de eso.
Nicholas White
2
la función ejecutada cada vez versus la función ejecutada una vez para la parición era el enlace que me faltaba. Tener acceso a más de un registro de datos a la vez con mapPartition es una cosa invaluable. aprecio la respuesta
y coma y cinta adhesiva el
1
¿Hay un escenario donde mapes mejor que mapPartitions? Si mapPartitionses tan bueno, ¿por qué no es la implementación de mapa predeterminada?
ruhong
1
@oneleggedmule: ambos son para diferentes requisitos que tenemos que usar sabiamente si está creando instancias de recursos como conexiones db (como se muestra en el ejemplo anterior) que son costosas, entonces mappartitions es el enfoque correcto ya que una conexión por partición. también saveAsTextFile particiones de mapa utilizadas internamente ver
Ram Ghadiyaram
@oneleggedmule Desde mi punto de vista, map () es más fácil de entender y aprender, y también es un método común de muchos idiomas diferentes. Puede ser más fácil de usar que mapPartitions () si alguien no está familiarizado con este método específico de Spark al principio. Si no hay diferencia de rendimiento, prefiero usar map ().
Raymond Chen
15

Mapa :

  1. Procesa una fila a la vez, muy similar al método map () de MapReduce.
  2. Regresas de la transformación después de cada fila.

MapPartitions

  1. Procesa la partición completa de una vez.
  2. Puede regresar de la función solo una vez después de procesar toda la partición.
  3. Todos los resultados intermedios deben mantenerse en la memoria hasta que procese toda la partición.
  4. Le proporciona la función setup () map () y cleanup () de MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

KrazyGautam
fuente
con respecto a 2: si realiza transformaciones de iterador a iterador y no materializa el iterador en una colección de algún tipo, no tendrá que mantener toda la partición en la memoria, de hecho, de esa manera la chispa podrá derrame partes de la partición al disco.
ilcord
44
No tiene que mantener toda la partición en la memoria, sino el resultado. No puede devolver el resultado hasta que haya procesado toda la partición
KrazyGautam