У меня есть набор данных ("guid", "timestamp", "agt"), как показано ниже.
val df = List(Test("a", "1", null),
Test("b", "2", "4"),
Test("a", "1", "3"),
Test("b", "2", "4"),
Test("c", "1", "3"),
Test("a", "6", "8"),
Test("b", "2", "4"),
Test("a", "1", "4")
мне нужно вычислить
- минимальная метка времени для каждой строки при группировке по guid.
- Счетчик для каждого ключа при группировке по (guid, timestamp)
- agtM строки, сгруппированной по guid и упорядоченной по отметке времени (desc), а затем взять первый непустой agt else ""
- Удалить дубликаты
Таким образом, вывод будет таким, как показано ниже.
+----+---------+---+-------+-----+----+
|guid|timestamp|agt|minimum|count|agtM|
+----+---------+---+-------+-----+----+
| c| 1| 3| 1| 1| 3|
| b| 2| 4| 2| 3| 4|
| a| 1| | 1| 3| 8|
| a| 6| 8| 1| 1| 8|
+----+---------+---+-------+-----+----+
я пытался
val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val gg = df.toDS()
.withColumn("minimum", min("timestamp").over(w))
.withColumn("count", count("*").over(w1))
.withColumn("agtM", coalesce(first($"agt", true).over(w2), lit("")))
.dropDuplicates("guid", "timestamp")
Однако в расчете agtM я не так уверен. Моя цель — добиться минимального перемешивания, так как в этом сценарии мы сначала группируем по guid, а затем группируем по (guid, timestamp), и логически вторая группировка должна происходить в первом созданном разделе. затем выходные данные группируются по guid и объединяются с другой таблицей. Оба данных довольно велики (в ТБ), поэтому я хотел добиться этого с минимальной перетасовкой и не хотел перемещать вычисление внутри mapGroups позже (я мог бы выполнить расчет agtM, просто отфильтровав группу с непустым временем агента, а затем maxBy метка времени). Не могли бы вы предложить лучший способ достичь вышеизложенного?
ИЗМЕНИТЬ
Исправлен расчет agtM. Просто чтобы дать больше контекста для предстоящих операций, объединение выходных данных и другого набора данных (одно дополнительное поле, мы оставили его фиктивным в выходных данных) затем необходимо будет сгруппировать по ключу для получения окончательных результатов. Я также думал вычислить эти значения (кроме окна w) внутри каждого раздела (mapPartitions), затем взять список внутри каждого раздела как еще один список и выполнить дальнейшие вычисления.
val ds = Seq( ("a", "1", "8"), ("b", "2", "4"), ("a", "1", "3"), ("b", "2", "4"), ("c", "1", ""), ("a", "6", ""), ("b", "2", "4"), ("a", "1", "4") ).toDF("guid", "timestamp", "agt"). as[(String, String, String)]
вывод:|guid|timestamp|agt|minimum|count|agtM| +----+---------+---+-------+-----+----+ | c| 1| | 1| 1|null| | b| 2| 4| 2| 3| 4| | a| 1| 8| 1| 3| 8| | a| 6| | 1| 1|null|
10.03.2018|guid|timestamp| agt|minimum|count|agtM| +----+---------+----+-------------+-----+--------+ | c| 1|null| 1| 1| | | b| 2| 4| 2| 3| 4| | a| 1| 8| 1| 3| 8| | a| 6|null| 1| 1| 8|
10.03.2018first
илиlast
, может не иметь большого значения. 10.03.2018