Arhn - архитектура программирования

Эффективные операции с искровым набором данных при разделении по перекрывающимся столбцам

У меня есть набор данных ("guid", "timestamp", "agt"), как показано ниже.

val df = List(Test("a", "1", null),
   Test("b", "2", "4"),
   Test("a", "1", "3"),
   Test("b", "2", "4"),
   Test("c", "1", "3"),
   Test("a", "6", "8"),
   Test("b", "2", "4"),
   Test("a", "1", "4")

мне нужно вычислить

  • минимальная метка времени для каждой строки при группировке по guid.
  • Счетчик для каждого ключа при группировке по (guid, timestamp)
  • agtM строки, сгруппированной по guid и упорядоченной по отметке времени (desc), а затем взять первый непустой agt else ""
  • Удалить дубликаты

Таким образом, вывод будет таким, как показано ниже.

+----+---------+---+-------+-----+----+
|guid|timestamp|agt|minimum|count|agtM|
+----+---------+---+-------+-----+----+
|   c|        1|  3|      1|    1|   3|
|   b|        2|  4|      2|    3|   4|
|   a|        1|   |      1|    3|   8|
|   a|        6|  8|      1|    1|   8|
+----+---------+---+-------+-----+----+

я пытался

val w = Window.partitionBy($"guid")

    val w1 = Window.partitionBy($"guid", $"timestamp")
    val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    val gg = df.toDS()
      .withColumn("minimum", min("timestamp").over(w))
      .withColumn("count", count("*").over(w1))
      .withColumn("agtM", coalesce(first($"agt", true).over(w2), lit("")))
      .dropDuplicates("guid", "timestamp")

Однако в расчете agtM я не так уверен. Моя цель — добиться минимального перемешивания, так как в этом сценарии мы сначала группируем по guid, а затем группируем по (guid, timestamp), и логически вторая группировка должна происходить в первом созданном разделе. затем выходные данные группируются по guid и объединяются с другой таблицей. Оба данных довольно велики (в ТБ), поэтому я хотел добиться этого с минимальной перетасовкой и не хотел перемещать вычисление внутри mapGroups позже (я мог бы выполнить расчет agtM, просто отфильтровав группу с непустым временем агента, а затем maxBy метка времени). Не могли бы вы предложить лучший способ достичь вышеизложенного?

ИЗМЕНИТЬ

Исправлен расчет agtM. Просто чтобы дать больше контекста для предстоящих операций, объединение выходных данных и другого набора данных (одно дополнительное поле, мы оставили его фиктивным в выходных данных) затем необходимо будет сгруппировать по ключу для получения окончательных результатов. Я также думал вычислить эти значения (кроме окна w) внутри каждого раздела (mapPartitions), затем взять список внутри каждого раздела как еще один список и выполнить дальнейшие вычисления.


  • Ваша спецификация окна w2, похоже, не делает ничего, связанного с вашим перечисленным требованием для agtM, которое должно заполнять agtM первым непустым agt в порядке убывания метки времени. Но тогда ваш ожидаемый результат 8, соответствующий "", кажется, предполагает, что вы действительно хотите заполнить последним непустым agt? 09.03.2018
  • разве эта строка | a| 1| | 1| 3| 8| не должна быть | a| 1| 3| 1| 2| 4|? 09.03.2018
  • @LeoCВы правы. Я хочу заполнить последний непустой agt, просматривая список для guid, упорядоченного по отметке времени. Другой способ вычислить то же самое: df.toDS().filter(_.agt != "").groupByKey(r => r.guid).mapGroups((a, b) => { val agtMObject = b.maxBy(p => p.timestamp) TestWithagtM(agtMObject.guid, agtMObject.timestamp, agtMObject.agt, agtMObject.agtM) }) @RameshMaharjan (a, 1, ) — это ввод, а последний столбец должен быть 8, потому что, если вы сгруппируете по a, а затем отсортируете по отметке времени desc, то 8 — это agt, соответствующий 6, станет agtM 09.03.2018

Ответы:


1

Чтобы заполнить agtM последним непустым значением agt, вы можете использовать last("agt", ignoreNulls) с rowsBetween() для w2:

val ds = Seq(
  ("a", "1", ""),
  ("b", "2", "4"),
  ("a", "1", "3"),
  ("b", "2", "4"),
  ("c", "1", "3"),
  ("a", "6", "8"),
  ("b", "2", "4"),
  ("a", "1", "4")
).toDF("guid", "timestamp", "agt").
  as[(String, String, String)]

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).
  rowsBetween(Window.unboundedPreceding, 0)

ds.
  withColumn("minimum", min("timestamp").over(w)).
  withColumn("count", count("*").over(w1)).
  withColumn("agt", when($"agt" =!= "", $"agt")).
  withColumn("agtM", last("agt", ignoreNulls = true).over(w2)).
  na.fill("", Seq("agt")).
  dropDuplicates("guid", "timestamp").
  show
// +----+---------+---+-------+-----+----+
// |guid|timestamp|agt|minimum|count|agtM|
// +----+---------+---+-------+-----+----+
// |   c|        1|  3|      1|    1|   3|
// |   b|        2|  4|      2|    3|   4|
// |   a|        1|   |      1|    3|   8|
// |   a|        6|  8|      1|    1|   8|
// +----+---------+---+-------+-----+----+

Учитывая, что каждая из ваших спецификаций окна w, w1 и w2 имеет свои особые требования, я не уверен, что можно многое сделать для уменьшения перетасовки. Вы можете изучить неоконные подходы, хотя результирующий набор данных, который вы намереваетесь создать, хорошо подходит для использования оконных функций.

10.03.2018
  • Это не работает для следующего набора данных. val ds = Seq( ("a", "1", "8"), ("b", "2", "4"), ("a", "1", "3"), ("b", "2", "4"), ("c", "1", ""), ("a", "6", ""), ("b", "2", "4"), ("a", "1", "4") ).toDF("guid", "timestamp", "agt"). as[(String, String, String)] вывод: |guid|timestamp|agt|minimum|count|agtM| +----+---------+---+-------+-----+----+ | c| 1| | 1| 1|null| | b| 2| 4| 2| 3| 4| | a| 1| 8| 1| 3| 8| | a| 6| | 1| 1|null| 10.03.2018
  • Именно таким должен быть результат, потому что пустое значение в ваших новых примерах данных находится в первой строке раздела, а заполнение не будет (и логически не должно) пересекать границу раздела, определенную в спецификации окна. 10.03.2018
  • В этом случае указанный WindowSpec неверен (я не уверен, почему WindowsSpec не может учитывать всю границу, я могу не называть его обратной засыпкой, если это нелогично), поскольку он не дает желаемого результата. Желаемый (Пожалуйста, посмотрите обновленный WnSpec в исходном сообщении) |guid|timestamp| agt|minimum|count|agtM| +----+---------+----+-------------+-----+--------+ | c| 1|null| 1| 1| | | b| 2| 4| 2| 3| 4| | a| 1| 8| 1| 3| 8| | a| 6|null| 1| 1| 8| 10.03.2018
  • Оконные функции работают в отдельных оконных разделах. Если ваше требование может быть ослаблено для получения ближайших предыдущих или следующих ненулевых значений, увеличение диапазона строк до (unboundedPreceding, unboundedFollowing) будет служить цели. В таком случае, используете ли вы first или last, может не иметь большого значения. 10.03.2018

  • 2

    Мне нужно вычислить
    минимальную метку времени для каждой строки при группировке по guid.
    Количество для каждого ключа при группировке по (guid, timestamp)
    agtM строки при группировке по guid и упорядочении по отметке времени (описание), а затем взять сначала непустое значение, иначе ""

    Глядя на ваше требование, вам нужно вычислить минимум временной метки, agtM (последний) agt для группы guid и подсчитать при группировке по guid и временной метке. Эти требования предполагают, что вам потребуются три группировки и три перетасовки.

    Первая группировка и перетасовка - чтобы найти количество

    val dfWithCount = df
          .groupBy("guid", "timestamp")
          .agg(count("guid").as("count"))
    

    Вторая и третья группировка и перемешивание

    последний agt, т. е. agtM можно найти с помощью функции Window, а минимальную метку времени можно найти с помощью других groupBy и aggregation

    val dfWithMinAndMax = df.withColumn("agtM", first("agt").over(windowSpec))
          .groupBy("guid", "agtM")
          .agg(min("timestamp").as("minimum")
          )
    

    Наконец, вы join оба фрейма данных

    val finalDF = dfWithCount.join(dfWithMinAndMax, Seq("guid"))
    

    Это даст вам правильный фрейм данных но без agt

    +----+---------+-----+----+-------+
    |guid|timestamp|count|agtM|minimum|
    +----+---------+-----+----+-------+
    |c   |1        |1    |3   |1      |
    |b   |2        |3    |4   |2      |
    |a   |1        |3    |8   |1      |
    |a   |6        |1    |8   |1      |
    +----+---------+-----+----+-------+
    

    Я предполагаю, что agt не так важно, но если вам это действительно нужно, вам понадобится еще одна группировка и перетасовка и объединение

    val dfWithAgt = df.groupBy("guid", "timestamp").agg(min("agt").as("agt"))
    
    finalDF.join(dfWithAgt, Seq("guid", "timestamp"))
    

    что даст тебе

    +----+---------+-----+----+-------+---+
    |guid|timestamp|count|agtM|minimum|agt|
    +----+---------+-----+----+-------+---+
    |c   |1        |1    |3   |1      |3  |
    |b   |2        |3    |4   |2      |4  |
    |a   |1        |3    |8   |1      |   |
    |a   |6        |1    |8   |1      |8  |
    +----+---------+-----+----+-------+---+
    

    Порядок столбцов можно задать с помощью select.

    надеюсь ответ будет полезен

    10.03.2018
  • ваше решение дает правильный результат. К сожалению, мне также нужен agt в конечном выводе (он будет использоваться в последующих операциях). 11.03.2018
  • В прошлой части я тоже использовал agt :) разве ты не видел @subhodip? 11.03.2018
  • Я видел .. Я не уверен, но не могли бы вы сказать мне, является ли это более эффективным, чем решение в исходном сообщении? это выглядит менее эффективно, хотя я могу ошибаться. Я согласен, что есть разные способы добиться того же, но можно ли получить тот же результат с 2 перетасовками или меньше? что, если я разделю по guid, а затем вызову mapPartitions и преобразую список в другой набор данных (уже сопоставленный guid, даже если может существовать несколько guid), а затем вычислю agtM и минимум внутри раздела? 11.03.2018
  • Да @subhodip, вы абсолютно правы. чем больше перетасовки, тем неэффективнее решение. 11.03.2018

  • 3

    Если сначала разбить его по guid, а затем работать с итераторами, логически получится меньше перетасовки. Не уверен в эффекте, если данные внутри каждой группы огромны.

    df.toDS().groupByKey(_.guid).flatMapGroups((a,b) => {
              val list = b.toList
              val minimum = list.minBy(_.timestamp).timestamp
              val filteredList = list.filterNot(_.agt == "")
              val agtM = if(filteredList.isEmpty) "" else filteredList.maxBy(_.timestamp).agt
              list.groupBy(_.timestamp).map(r => (r._2.head.guid, r._1, r._2.head.agt, minimum, r._2.length, agtM))
            }).select($"_1".as("guid"), $"_2".as("timestamp"),
              $"_3".as("agt"), $"_4".as("minimum"), $"_5".as("count"), $"_6".as("agtM")).show()
    
    12.03.2018
    Новые материалы

    Коллекции публикаций по глубокому обучению
    Последние пару месяцев я создавал коллекции последних академических публикаций по различным подполям глубокого обучения в моем блоге https://amundtveit.com - эта публикация дает обзор 25..

    Представляем: Pepita
    Фреймворк JavaScript с открытым исходным кодом Я знаю, что недостатка в фреймворках JavaScript нет. Но я просто не мог остановиться. Я хотел написать что-то сам, со своими собственными..

    Советы по коду Laravel #2
    1-) Найти // You can specify the columns you need // in when you use the find method on a model User::find(‘id’, [‘email’,’name’]); // You can increment or decrement // a field in..

    Работа с временными рядами спутниковых изображений, часть 3 (аналитика данных)
    Анализ временных рядов спутниковых изображений для данных наблюдений за большой Землей (arXiv) Автор: Рольф Симоэс , Жильберто Камара , Жильберто Кейрос , Фелипе Соуза , Педро Р. Андраде ,..

    3 способа решить квадратное уравнение (3-й мой любимый) -
    1. Методом факторизации — 2. Используя квадратичную формулу — 3. Заполнив квадрат — Давайте поймем это, решив это простое уравнение: Мы пытаемся сделать LHS,..

    Создание VR-миров с A-Frame
    Виртуальная реальность (и дополненная реальность) стали главными модными терминами в образовательных технологиях. С недорогими VR-гарнитурами, такими как Google Cardboard , и использованием..

    Демистификация рекурсии
    КОДЕКС Демистификация рекурсии Упрощенная концепция ошеломляющей О чем весь этот шум? Рекурсия, кажется, единственная тема, от которой у каждого начинающего студента-информатика..