Как создать пакеты из 10 элементов для вычисления среднего значения в Java 8.0 с Flink 1.9.1

avatar
Jorge Lopez Marcos
1 июля 2021 в 18:15
38
1
0

Я работал с Java 8.0, Flink 1.9.1 и Maeven 1.8.

Ко мне в руки попал следующий CSV-файл, который собирает данные о поездках на такси в Нью-Йорке. dataset Вы можете скачать его отсюда: https://dl.lsdupm.ovh/yellow_tripdata_2020-01.csv

Для меня важны следующие переменные:

  • Идентификатор — столбец 0
  • когда поездка началась - Столбец 1
  • когда поездка закончилась - Столбец 2
  • пройденное расстояние — столбец 4

Идентификатор разделяет два агентства такси, которые существуют в записях. Агентство 1 и Агентство 2.

Я хотел вычислить:

  • среднее значение в секундах каждых 10 поездок каждой группы ( 1 и 2 )
  • среднее расстояние каждых 10 поездок каждой группы ( 1 и 2 )

Я хочу получить следующий результат:

  • 2, 1.68, 5395 -> (id, среднее расстояние из десяти поездок, среднее время в секундах)
  • 2, 3,26, 752
  • 1, 3,61, 897
  • 2, 4,72, 1196
  • и т.д...

Путь, которым я следую, чтобы получить этот результат:

  1. Разбирать набор данных как строку
  2. Создайте новую переменную SingleOutputStreamOperator с функцией Map. В котором я привожу дату (строку) к миллисекундам с помощью SingleDateFormat("").parse(). После этого я вычту, когда поездка закончилась в миллисекундах, из того, когда поездка началась, также в миллисекундах. Возврат SingleOutputStreamOperator из трех элементов: идентификатор/пройденное расстояние/время в миллисекундах
  3. Приведите этот DataStream к KeyedStream, используя Keyby(0). 0 — это идентификатор, который создаст два пакета: один с идентификатором 1, а другой — с идентификатором 2.
  4. Это часть, в которой я застрял. Мне нужно создать пакеты из 10 поездок
  5. Вычислите среднее расстояние и среднее время в пути для каждого из этих 10 пакетов поездок для идентификаторов 1 и 2. Их нельзя смешивать.

Примечания. Я пытался сделать это с Time Windows, но мне не удалось ограничить пакеты до 10 поездок.

Большое спасибо за помощь и время.

Если вам нужно что-то вроде кода, просто спросите!

Источник

Ответы (1)

avatar
David Anderson
1 июля 2021 в 19:31
0

Похоже, вы хотите использовать окна подсчета:

stream
  .keyBy(t -> t.f0)
  .countWindow(10)
  .process(...)

Есть также хороший пример в документации, показывающий, как сделать именно это с нуля (имеется в виду, если окна подсчета не были включены в API).