Una gran diferencia, que no se menciona claramente en ninguna otra respuesta de desbordamiento de pila relacionada con este tema, es que reducese le debe dar un monoide conmutativo , es decir, una operación que sea tanto conmutativa como asociativa. Esto significa que la operación se puede paralelizar.
Esta distinción es muy importante para Big Data / MPP / computación distribuida, y la razón por la que reduceincluso existe. La colección se puede cortar y la reducepuede operar en cada fragmento, luego reducepuede operar en los resultados de cada fragmento; de hecho, el nivel de fragmentación no tiene por qué detenerse en un nivel de profundidad. También podríamos cortar cada trozo. Esta es la razón por la que sumar enteros en una lista es O (log N) si se le da un número infinito de CPU.
Si solo miras las firmas, no hay razón para reduceque exista porque puedes lograr todo lo que puedas reducecon un foldLeft. La funcionalidad de foldLeftes mayor que la funcionalidad de reduce.
Pero no puede paralelizar a foldLeft, por lo que su tiempo de ejecución es siempre O (N) (incluso si alimenta un monoide conmutativo). Esto se debe a que se supone que la operación no es un monoide conmutativo y, por lo tanto, el valor acumulado se calculará mediante una serie de agregaciones secuenciales.
foldLeftno asume conmutatividad ni asociatividad. Es la asociatividad la que da la capacidad de dividir la colección, y es la conmutatividad la que facilita la acumulación porque el orden no es importante (por lo que no importa en qué orden agregar cada uno de los resultados de cada uno de los fragmentos). Estrictamente hablando, la conmutatividad no es necesaria para la paralelización, por ejemplo, los algoritmos de clasificación distribuida, simplemente hace que la lógica sea más fácil porque no necesita ordenar sus fragmentos.
Si echas un vistazo a la documentación de Spark reduce, dice específicamente "... operador binario conmutativo y asociativo"
Aquí hay una prueba de que reduceNO es solo un caso especial defoldLeft
scala>val intParList:ParSeq[Int]=(1 to 100000).map(_ => scala.util.Random.nextInt()).par
scala> timeMany(1000, intParList.reduce(_ + _))Took462.395867 milli seconds
scala> timeMany(1000, intParList.foldLeft(0)(_ + _))Took2589.363031 milli seconds
reducir vs doblar
Aquí es donde se acerca un poco más a las raíces matemáticas / FP, y es un poco más complicado de explicar. Reducir se define formalmente como parte del paradigma MapReduce, que se ocupa de las colecciones sin orden (multisets), Fold se define formalmente en términos de recursividad (ver catamorfismo) y, por lo tanto, asume una estructura / secuencia para las colecciones.
No existe un foldmétodo en Scalding porque bajo el modelo de programación (estricto) Map Reduce no podemos definir foldporque los fragmentos no tienen un orden y foldsolo requieren asociatividad, no conmutatividad.
En pocas palabras, reducefunciona sin un orden de acumulación, foldrequiere un orden de acumulación y es ese orden de acumulación el que necesita un valor cero, NO la existencia del valor cero que los distingue. Estrictamente hablando, reducedebería funcionar en una colección vacía, porque su valor cero puede deducirse tomando un valor arbitrario xy luego resolviendo x op y = x, pero eso no funciona con una operación no conmutativa, ya que puede existir un valor cero izquierdo y derecho que son distintos (es decir x op y != y op x). Por supuesto, Scala no se molesta en averiguar cuál es este valor cero, ya que eso requeriría hacer algunas matemáticas (que probablemente no se puedan calcular), por lo que solo lanza una excepción.
Parece (como suele ser el caso en etimología) que este significado matemático original se ha perdido, ya que la única diferencia obvia en la programación es la firma. El resultado es que se reduceha convertido en un sinónimo foldde MapReduce, en lugar de conservar su significado original. Ahora, estos términos a menudo se usan indistintamente y se comportan igual en la mayoría de las implementaciones (ignorando las colecciones vacías). La rareza se ve agravada por peculiaridades, como en Spark, que ahora abordaremos.
Así que la chispa no tiene unafold , pero el orden en el que se combinan sub resultados (uno para cada partición) (en el momento de la escritura) es el mismo orden en que se hayan completado las tareas - y por lo tanto no determinista. Gracias a @CafeFeed por señalar ese folduso runJob, que después de leer el código me di cuenta de que no es determinista. Spark crea más confusión porque tiene un treeReducepero no treeFold.
Conclusión
Hay una diferencia entre reduceyfold incluso cuando se aplica a secuencias no vacías. El primero se define como parte del paradigma de programación MapReduce en colecciones con orden arbitrario ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) y se debe asumir que los operadores son conmutativos además de ser asociativo para dar resultados deterministas. Este último se define en términos de catomorfismos y requiere que las colecciones tengan una noción de secuencia (o se definan de forma recursiva, como listas enlazadas), por lo que no requieren operadores conmutativos.
En la práctica, debido a la naturaleza no matemática de la programación, reduce y foldtienden a comportarse de la misma manera, ya sea correctamente (como en Scala) o incorrectamente (como en Spark).
Extra: Mi opinión sobre la API Spark
Mi opinión es que se evitaría la confusión si el uso del término foldse eliminara por completo en Spark. Al menos Spark tiene una nota en su documentación:
Esto se comporta de forma algo diferente a las operaciones de plegado implementadas para colecciones no distribuidas en lenguajes funcionales como Scala.
Es por eso que foldLeftcontiene el Leften su nombre y por eso también hay un método llamado fold.
kiritsuku
1
@Cloudtech Esa es una coincidencia de su implementación de un solo hilo, no dentro de su especificación. En mi máquina de 4 núcleos, si intento agregar .par, (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)obtengo resultados diferentes cada vez.
samthebest
2
@AlexDean en el contexto de la informática, no, realmente no necesita una identidad, ya que las colecciones vacías tienden a arrojar excepciones. Pero es matemáticamente más elegante (y sería más elegante si las colecciones hicieran esto) si el elemento de identidad se devuelve cuando la colección está vacía. En matemáticas "lanzar una excepción" no existe.
samthebest
3
@samthebest: ¿Estás seguro de la conmutatividad? github.com/apache/spark/blob/… dice "Para las funciones que no son conmutativas, el resultado puede diferir del de un pliegue aplicado a una colección no distribuida".
Make42
1
@ Make42 Eso es correcto, uno podría escribir su propio reallyFoldchulo, ya que:, rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)esto no necesitaría f para viajar.
samthebest
10
Si no me equivoco, aunque la API de Spark no lo requiere, fold también requiere que la f sea conmutativa. Porque el orden en el que se agregarán las particiones no está asegurado. Por ejemplo, en el siguiente código solo se ordena la primera impresión:
Después de algunas idas y venidas, creemos que tiene razón. El orden de combinación es por orden de llegada. Si ejecuta sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)con más de 2 núcleos varias veces, creo que verá que produce un orden aleatorio (partición). He actualizado mi respuesta en consecuencia.
samthebest
3
folden Apache Spark no es lo mismo que folden colecciones no distribuidas. De hecho , requiere una función conmutativa para producir resultados deterministas:
Esto se comporta de forma algo diferente a las operaciones de plegado implementadas para colecciones no distribuidas en lenguajes funcionales como Scala. Esta operación de plegado se puede aplicar a particiones individualmente y luego plegar esos resultados en el resultado final, en lugar de aplicar el plegado a cada elemento secuencialmente en algún orden definido. Para las funciones que no son conmutativas, el resultado puede diferir del de un pliegue aplicado a una colección no distribuida.
Se ha sugerido que el comportamiento observado está relacionado con HashPartitionercuando en realidad parallelizeno se baraja y no se usa HashPartitioner.
import org.apache.spark.sql.SparkSession/* Note: standalone (non-local) mode */val master ="spark://...:7077"val spark =SparkSession.builder.master(master).getOrCreate()/* Note: deterministic order */val rdd = sc.parallelize(Seq("a","b","c","d"),4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall {caseArray(x, y)=> x < y })/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size ==24)
def fold(zeroValue: T)(op:(T, T)=> T): T = withScope {var jobResult: T
val cleanOp:(T, T)=> T
val foldPartition =Iterator[T]=> T
val mergeResult:(Int, T)=>Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
def reduce(f:(T, T)=> T): T = withScope {val cleanF:(T, T)=> T
val reducePartition:Iterator[T]=>Option[T]var jobResult:Option[T]val mergeResult =(Int,Option[T])=>Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(thrownewUnsupportedOperationException("empty collection"))}
donde runJobse realiza sin tener en cuenta el orden de partición y resulta en la necesidad de una función conmutativa.
foldPartitiony reducePartitionson equivalentes en términos de orden de procesamiento y efectivamente (por herencia y delegación) implementados por reduceLefty foldLeftsucesivamente TraversableOnce.
Conclusión: folden RDD no puede depender del orden de los trozos y necesita conmutatividad y asociatividad .
Tengo que admitir que la etimología es confusa y que la literatura de programación carece de definiciones formales. Creo que es seguro decir que folden RDDs es realmente lo mismo que reduce, pero esto no respeta las diferencias matemáticas fundamentales (he actualizado mi respuesta para que sea aún más clara). Aunque no estoy de acuerdo con que realmente necesitemos conmutatividad siempre que uno tenga confianza en que lo que sea que esté haciendo su partidista, está preservando el orden.
samthebest
El orden de plegado indefinido no está relacionado con la partición. Es una consecuencia directa de la implementación de runJob.
¡AH! Lo siento, no pude entender cuál era su punto, pero después de leer el runJobcódigo, veo que, de hecho, hace la combinación de acuerdo con el momento en que finaliza una tarea, NO el orden de las particiones. Es este detalle clave el que hace que todo encaje en su lugar. He editado mi respuesta una vez más y por lo tanto corregido el error que usted señala. ¿Podría eliminar su recompensa ya que ahora estamos de acuerdo?
samthebest
No puedo editar ni eliminar, no existe esa opción. Puedo premiar, pero creo que obtienes bastantes puntos solo con una atención, ¿me equivoco? Si confirmas que quieres que te recompense lo hago en las próximas 24 horas. Gracias por las correcciones y perdón por un método, pero parecía que ignoraste todas las advertencias, es algo importante y la respuesta se ha citado por todas partes.
1
¿Qué tal si se lo otorgas a @Mishael Rosenthal, ya que fue el primero en expresar claramente la preocupación? No tengo ningún interés en los puntos, solo me gusta usar SO para el SEO y la organización.
samthebest
2
Otra diferencia para Scalding es el uso de combinadores en Hadoop.
Imagine que su operación es un monoide conmutativo, con reducir se aplicará también en el lado del mapa en lugar de barajar / ordenar todos los datos a los reductores. Con foldLeft este no es el caso.
pipe.groupBy('product){
_.reduce('price->'total){(sum:Double, price:Double)=> sum + price }// reduce is .mapReduceMap in disguise}
pipe.groupBy('product){
_.foldLeft('price->'total)(0.0){(sum:Double, price:Double)=> sum + price }}
Siempre es una buena práctica definir sus operaciones como monoide en Scalding.
Respuestas:
reducir vs doblarIzquierda
Una gran diferencia, que no se menciona claramente en ninguna otra respuesta de desbordamiento de pila relacionada con este tema, es que
reduce
se le debe dar un monoide conmutativo , es decir, una operación que sea tanto conmutativa como asociativa. Esto significa que la operación se puede paralelizar.Esta distinción es muy importante para Big Data / MPP / computación distribuida, y la razón por la que
reduce
incluso existe. La colección se puede cortar y lareduce
puede operar en cada fragmento, luegoreduce
puede operar en los resultados de cada fragmento; de hecho, el nivel de fragmentación no tiene por qué detenerse en un nivel de profundidad. También podríamos cortar cada trozo. Esta es la razón por la que sumar enteros en una lista es O (log N) si se le da un número infinito de CPU.Si solo miras las firmas, no hay razón para
reduce
que exista porque puedes lograr todo lo que puedasreduce
con unfoldLeft
. La funcionalidad defoldLeft
es mayor que la funcionalidad dereduce
.Pero no puede paralelizar a
foldLeft
, por lo que su tiempo de ejecución es siempre O (N) (incluso si alimenta un monoide conmutativo). Esto se debe a que se supone que la operación no es un monoide conmutativo y, por lo tanto, el valor acumulado se calculará mediante una serie de agregaciones secuenciales.foldLeft
no asume conmutatividad ni asociatividad. Es la asociatividad la que da la capacidad de dividir la colección, y es la conmutatividad la que facilita la acumulación porque el orden no es importante (por lo que no importa en qué orden agregar cada uno de los resultados de cada uno de los fragmentos). Estrictamente hablando, la conmutatividad no es necesaria para la paralelización, por ejemplo, los algoritmos de clasificación distribuida, simplemente hace que la lógica sea más fácil porque no necesita ordenar sus fragmentos.Si echas un vistazo a la documentación de Spark
reduce
, dice específicamente "... operador binario conmutativo y asociativo"http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
Aquí hay una prueba de que
reduce
NO es solo un caso especial defoldLeft
reducir vs doblar
Aquí es donde se acerca un poco más a las raíces matemáticas / FP, y es un poco más complicado de explicar. Reducir se define formalmente como parte del paradigma MapReduce, que se ocupa de las colecciones sin orden (multisets), Fold se define formalmente en términos de recursividad (ver catamorfismo) y, por lo tanto, asume una estructura / secuencia para las colecciones.
No existe un
fold
método en Scalding porque bajo el modelo de programación (estricto) Map Reduce no podemos definirfold
porque los fragmentos no tienen un orden yfold
solo requieren asociatividad, no conmutatividad.En pocas palabras,
reduce
funciona sin un orden de acumulación,fold
requiere un orden de acumulación y es ese orden de acumulación el que necesita un valor cero, NO la existencia del valor cero que los distingue. Estrictamente hablando,reduce
debería funcionar en una colección vacía, porque su valor cero puede deducirse tomando un valor arbitrariox
y luego resolviendox op y = x
, pero eso no funciona con una operación no conmutativa, ya que puede existir un valor cero izquierdo y derecho que son distintos (es decirx op y != y op x
). Por supuesto, Scala no se molesta en averiguar cuál es este valor cero, ya que eso requeriría hacer algunas matemáticas (que probablemente no se puedan calcular), por lo que solo lanza una excepción.Parece (como suele ser el caso en etimología) que este significado matemático original se ha perdido, ya que la única diferencia obvia en la programación es la firma. El resultado es que se
reduce
ha convertido en un sinónimofold
de MapReduce, en lugar de conservar su significado original. Ahora, estos términos a menudo se usan indistintamente y se comportan igual en la mayoría de las implementaciones (ignorando las colecciones vacías). La rareza se ve agravada por peculiaridades, como en Spark, que ahora abordaremos.Así que la chispa no tiene una
fold
, pero el orden en el que se combinan sub resultados (uno para cada partición) (en el momento de la escritura) es el mismo orden en que se hayan completado las tareas - y por lo tanto no determinista. Gracias a @CafeFeed por señalar esefold
usorunJob
, que después de leer el código me di cuenta de que no es determinista. Spark crea más confusión porque tiene untreeReduce
pero notreeFold
.Conclusión
Hay una diferencia entre
reduce
yfold
incluso cuando se aplica a secuencias no vacías. El primero se define como parte del paradigma de programación MapReduce en colecciones con orden arbitrario ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) y se debe asumir que los operadores son conmutativos además de ser asociativo para dar resultados deterministas. Este último se define en términos de catomorfismos y requiere que las colecciones tengan una noción de secuencia (o se definan de forma recursiva, como listas enlazadas), por lo que no requieren operadores conmutativos.En la práctica, debido a la naturaleza no matemática de la programación,
reduce
yfold
tienden a comportarse de la misma manera, ya sea correctamente (como en Scala) o incorrectamente (como en Spark).Extra: Mi opinión sobre la API Spark
Mi opinión es que se evitaría la confusión si el uso del término
fold
se eliminara por completo en Spark. Al menos Spark tiene una nota en su documentación:fuente
foldLeft
contiene elLeft
en su nombre y por eso también hay un método llamadofold
..par
,(List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)
obtengo resultados diferentes cada vez.reallyFold
chulo, ya que:,rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)
esto no necesitaría f para viajar.Si no me equivoco, aunque la API de Spark no lo requiere, fold también requiere que la f sea conmutativa. Porque el orden en el que se agregarán las particiones no está asegurado. Por ejemplo, en el siguiente código solo se ordena la primera impresión:
Imprimir:
ABCDEFGHIJKLMNOPQRSTU VWXYZ
abcghituvjklmwxyzqrsdefnop
defghinopjklmqrstuvabcwxyz
fuente
sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)
con más de 2 núcleos varias veces, creo que verá que produce un orden aleatorio (partición). He actualizado mi respuesta en consecuencia.fold
en Apache Spark no es lo mismo quefold
en colecciones no distribuidas. De hecho , requiere una función conmutativa para producir resultados deterministas:Esto ha sido demostrado por Mishael Rosenthal y sugerido por Make42 en su comentario .
Se ha sugerido que el comportamiento observado está relacionado con
HashPartitioner
cuando en realidadparallelize
no se baraja y no se usaHashPartitioner
.Explicado:
Estructura de
fold
para RDDes la misma que la estructura de
reduce
RDD:donde
runJob
se realiza sin tener en cuenta el orden de partición y resulta en la necesidad de una función conmutativa.foldPartition
yreducePartition
son equivalentes en términos de orden de procesamiento y efectivamente (por herencia y delegación) implementados porreduceLeft
yfoldLeft
sucesivamenteTraversableOnce
.Conclusión:
fold
en RDD no puede depender del orden de los trozos y necesita conmutatividad y asociatividad .fuente
fold
enRDD
s es realmente lo mismo quereduce
, pero esto no respeta las diferencias matemáticas fundamentales (he actualizado mi respuesta para que sea aún más clara). Aunque no estoy de acuerdo con que realmente necesitemos conmutatividad siempre que uno tenga confianza en que lo que sea que esté haciendo su partidista, está preservando el orden.runJob
código, veo que, de hecho, hace la combinación de acuerdo con el momento en que finaliza una tarea, NO el orden de las particiones. Es este detalle clave el que hace que todo encaje en su lugar. He editado mi respuesta una vez más y por lo tanto corregido el error que usted señala. ¿Podría eliminar su recompensa ya que ahora estamos de acuerdo?Otra diferencia para Scalding es el uso de combinadores en Hadoop.
Imagine que su operación es un monoide conmutativo, con reducir se aplicará también en el lado del mapa en lugar de barajar / ordenar todos los datos a los reductores. Con foldLeft este no es el caso.
Siempre es una buena práctica definir sus operaciones como monoide en Scalding.
fuente