Tengo un gran conjunto de datos que necesito dividir en grupos de acuerdo con parámetros específicos. Quiero que el trabajo se procese de la manera más eficiente posible. Puedo imaginar dos formas de hacerlo
Opción 1 : crear un mapa a partir del RDD original y filtrar
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
Opción 2 - Filtrar RDD original directamente
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
El primer método tiene que iterar sobre todos los registros del conjunto de datos original 3 veces, donde el segundo solo tiene que hacerlo dos veces, en circunstancias normales, sin embargo, la chispa hace algo de construcción de gráficos detrás de escena, por lo que podría imaginar que son efectivamente hecho de la misma manera. Mis preguntas son: a.) ¿Es un método más eficiente que el otro, o la construcción del gráfico de chispa los hace equivalentes b.) ¿Es posible hacer esta división en una sola pasada?
fuente
Respuestas:
Antes que nada déjame decirte que no soy un experto en Spark; Lo he estado usando bastante en los últimos meses, y creo que ahora lo entiendo, pero puedo estar equivocado.
Entonces, respondiendo a sus preguntas:
a.) son equivalentes, pero no en la forma en que lo estás viendo; Spark no optimizará el gráfico si se lo pregunta, pero
customMapper
aún se ejecutará dos veces en ambos casos; esto se debe al hecho de que para spark,rdd1
yrdd2
son dos RDD completamente diferentes, y construirá el gráfico de transformación de abajo hacia arriba a partir de las hojas; entonces la opción 1 se traducirá a:Como dijiste,
customMapper
se ejecuta dos veces (además, tambiénrddIn
se leerá dos veces, lo que significa que si proviene de una base de datos, puede ser aún más lento).b.) hay una manera, solo tienes que moverte
cache()
en el lugar correcto:Al hacer esto, le estamos diciendo a chispa que puede almacenar los resultados parciales de
mappedRdd
; luego usará estos resultados parciales parardd1
yrdd2
. Desde el punto de vista de la chispa, esto es equivalente a:fuente