Spark, dividiendo de manera óptima un solo RDD en dos

10

Tengo un gran conjunto de datos que necesito dividir en grupos de acuerdo con parámetros específicos. Quiero que el trabajo se procese de la manera más eficiente posible. Puedo imaginar dos formas de hacerlo

Opción 1 : crear un mapa a partir del RDD original y filtrar

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Opción 2 - Filtrar RDD original directamente

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

El primer método tiene que iterar sobre todos los registros del conjunto de datos original 3 veces, donde el segundo solo tiene que hacerlo dos veces, en circunstancias normales, sin embargo, la chispa hace algo de construcción de gráficos detrás de escena, por lo que podría imaginar que son efectivamente hecho de la misma manera. Mis preguntas son: a.) ¿Es un método más eficiente que el otro, o la construcción del gráfico de chispa los hace equivalentes b.) ¿Es posible hacer esta división en una sola pasada?

jagartner
fuente
También me encontré con un problema muy similar y realmente no encontré una solución. Pero lo que realmente sucede no está claro en este código, porque la chispa tiene una 'evaluación perezosa' y supuestamente es capaz de ejecutar solo lo que realmente necesita ejecutar, y también de combinar mapas, filtros y todo lo que se pueda hacer juntos. Entonces, posiblemente lo que describas pueda suceder en un solo pase. Sin embargo, no estoy lo suficientemente familiarizado con los mecanismos de evaluación perezosa. En realidad, acabo de notar el .cache (). ¿Quizás hay una manera de hacer solo un .cache () y obtener los resultados completos?
user3780968

Respuestas:

9

Antes que nada déjame decirte que no soy un experto en Spark; Lo he estado usando bastante en los últimos meses, y creo que ahora lo entiendo, pero puedo estar equivocado.

Entonces, respondiendo a sus preguntas:

a.) son equivalentes, pero no en la forma en que lo estás viendo; Spark no optimizará el gráfico si se lo pregunta, pero customMapperaún se ejecutará dos veces en ambos casos; esto se debe al hecho de que para spark, rdd1y rdd2son dos RDD completamente diferentes, y construirá el gráfico de transformación de abajo hacia arriba a partir de las hojas; entonces la opción 1 se traducirá a:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Como dijiste, customMapperse ejecuta dos veces (además, también rddInse leerá dos veces, lo que significa que si proviene de una base de datos, puede ser aún más lento).

b.) hay una manera, solo tienes que moverte cache()en el lugar correcto:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

Al hacer esto, le estamos diciendo a chispa que puede almacenar los resultados parciales de mappedRdd; luego usará estos resultados parciales para rdd1y rdd2. Desde el punto de vista de la chispa, esto es equivalente a:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
StefanoP
fuente