Convierta la columna Spark DataFrame a la lista de Python

104

Trabajo en un marco de datos con dos columnas, mvv y count.

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |

Me gustaría obtener dos listas que contengan valores mvv y valor de recuento. Algo como

mvv = [1,2,3,4]
count = [5,9,3,1]

Entonces, probé el siguiente código: La primera línea debería devolver una lista de filas de Python. Quería ver el primer valor:

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)

Pero recibo un mensaje de error con la segunda línea:

AttributeError: getInt

a.moussa
fuente
A partir del Spark 2.3, este código es el más rápido y con menos probabilidades de causar excepciones OutOfMemory: list(df.select('mvv').toPandas()['mvv']). Arrow se integró en PySpark, lo que aceleró toPandassignificativamente. No use los otros enfoques si está usando Spark 2.3+. Consulte mi respuesta para obtener más detalles sobre la evaluación comparativa.
Potencias

Respuestas:

141

Mira, por qué esta forma en que lo estás haciendo no funciona. Primero, está tratando de obtener un número entero de un tipo de fila , el resultado de su recopilación es así:

>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)

Si toma algo como esto:

>>> firstvalue = mvv_list[0].mvv
Out: 1

Obtendrás el mvvvalor. Si desea toda la información de la matriz, puede tomar algo como esto:

>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]

Pero si intentas lo mismo para la otra columna, obtienes:

>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'

Esto sucede porque countes un método incorporado. Y la columna tiene el mismo nombre que count. Una solución alternativa para hacer esto es cambiar el nombre de la columna de counta _count:

>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]

Pero esta solución alternativa no es necesaria, ya que puede acceder a la columna utilizando la sintaxis del diccionario:

>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]

¡Y finalmente funcionará!

Thiago Baldim
fuente
funciona muy bien para la primera columna, pero no funciona para el recuento de columnas, creo que debido a (el recuento de funciones de chispa)
a.moussa
¿Puedes agregar qué estás haciendo con el recuento? Agregue aquí en los comentarios.
Thiago Baldim
gracias por tu respuesta Así que esta línea funciona mvv_list = [int (i.mvv) para i en mvv_count.select ('mvv'). collect ()] pero no esta count_list = [int (i.count) para i en mvv_count .select ('count'). collect ()] devuelve una sintaxis no válida
a.moussa
No es necesario que agregue este select('count')uso de esta manera: count_list = [int(i.count) for i in mvv_list.collect()]agregaré el ejemplo a la respuesta.
Thiago Baldim
1
@ a.moussa [i.['count'] for i in mvv_list.collect()]trabajos para que sea explícita para usar la columna denominada 'contar' y no a la countfunción
user989762
103

Seguir un trazador de líneas le da la lista que desea.

mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()
Neo
fuente
3
En cuanto al rendimiento, esta solución es mucho más rápida que su solución mvv_list = [int (i.mvv) for i in mvv_count.select ('mvv'). Collect ()]
Chanaka Fernando
Esta es, con mucho, la mejor solución que he visto. Gracias.
hui chen
22

Esto le dará todos los elementos como una lista.

mvv_list = list(
    mvv_count_df.select('mvv').toPandas()['mvv']
)
Muhammad Raihan Muhaimin
fuente
1
Esta es la solución más rápida y eficiente para Spark 2.3+. Vea los resultados de la evaluación comparativa en mi respuesta.
Potencias
16

El siguiente código te ayudará

mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()
Itachi
fuente
3
Esta debería ser la respuesta aceptada. la razón es que usted permanece en un contexto de chispa durante todo el proceso y luego recolecta al final en lugar de salir del contexto de chispa antes, lo que puede causar una recolección mayor dependiendo de lo que esté haciendo.
AntiPawn79
15

En mis datos obtuve estos puntos de referencia:

>>> data.select(col).rdd.flatMap(lambda x: x).collect()

0,52 segundos

>>> [row[col] for row in data.collect()]

0.271 segundos

>>> list(data.select(col).toPandas()[col])

0.427 segundos

El resultado es el mismo

hombres luminosos
fuente
1
Si lo usa en toLocalIteratorlugar de collect, debería ser incluso más eficiente en la memoria[row[col] for row in data.toLocalIterator()]
oglop
6

Si recibe el siguiente error:

AttributeError: el objeto 'lista' no tiene atributo 'recopilar'

Este código resolverá sus problemas:

mvv_list = mvv_count_df.select('mvv').collect()

mvv_array = [int(i.mvv) for i in mvv_list]
anirban sen
fuente
También obtuve ese error y esta solución resolvió el problema. Pero, ¿por qué recibí el error? (¡Muchos otros parecen no entender eso!)
bikashg
3

Ejecuté un análisis comparativo y list(mvv_count_df.select('mvv').toPandas()['mvv'])es el método más rápido. Estoy muy sorprendido.

Ejecuté los diferentes enfoques en 100 mil / 100 millones de conjuntos de datos de filas utilizando un clúster i3.xlarge de 5 nodos (cada nodo tiene 30,5 GB de RAM y 4 núcleos) con Spark 2.4.5. Los datos se distribuyeron uniformemente en 20 archivos Parquet comprimidos y ágiles con una sola columna.

Estos son los resultados de la evaluación comparativa (tiempos de ejecución en segundos):

+-------------------------------------------------------------+---------+-------------+
|                          Code                               | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect()    |     0.4 | 55.3        |
| list(df.select('col_name').toPandas()['col_name'])          |     0.4 | 17.5        |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()|     0.9 | 69          |
| [row[0] for row in df.select('col_name').collect()]         |     1.0 | OOM         |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] |     1.2 | *           |
+-------------------------------------------------------------+---------+-------------+

* cancelled after 800 seconds

Reglas de oro a seguir al recopilar datos en el nodo del controlador:

  • Intente resolver el problema con otros enfoques. La recopilación de datos en el nodo del controlador es costosa, no aprovecha la potencia del clúster Spark y debe evitarse siempre que sea posible.
  • Reúna la menor cantidad de filas posible. Agregue, deduplica, filtre y pode columnas antes de recopilar los datos. Envíe la menor cantidad de datos posible al nodo del controlador.

toPandas se mejoró significativamente en Spark 2.3 . Probablemente no sea el mejor enfoque si está utilizando una versión Spark anterior a la 2.3.

Consulte aquí para obtener más detalles / resultados de evaluación comparativa.

Potestades
fuente
2

Una posible solución es usar la collect_list()función de pyspark.sql.functions. Esto agregará todos los valores de columna en una matriz pyspark que se convierte en una lista de Python cuando se recopila:

mvv_list   = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0] 
phgui
fuente