Último registro de marco dinámico

from datetime import date

rdd = sc.parallelize([
    [1, date(2016, 1, 7), 13.90],
    [1, date(2016, 1, 16), 14.50],
    [2, date(2016, 1, 9), 10.50],
    [2, date(2016, 1, 28), 5.50],
    [3, date(2016, 1, 5), 1.50]
])

df = rdd.toDF(['id','date','price'])
df.show()

+---+----------+-----+
| id|      date|price|
+---+----------+-----+
|  1|2016-01-07| 13.9|
|  1|2016-01-16| 14.5|
|  2|2016-01-09| 10.5|
|  2|2016-01-28|  5.5|
|  3|2016-01-05|  1.5|
+---+----------+-----+

df.registerTempTable("entries") // Replaced by createOrReplaceTempView in Spark 2.0

output = sqlContext.sql('''
    SELECT 
        *
    FROM (
        SELECT 
            *,
            dense_rank() OVER (PARTITION BY id ORDER BY date DESC) AS rank
        FROM entries
    ) vo WHERE rank = 1
''');

output.show();

+---+----------+-----+----+
| id|      date|price|rank|
+---+----------+-----+----+
|  1|2016-01-16| 14.5|   1|
|  2|2016-01-28|  5.5|   1|
|  3|2016-01-05|  1.5|   1|
+---+----------+-----+----+
Talented Tarsier