Стратегия водяных знаков Flink

avatar
ekjot
1 июля 2021 в 17:26
252
1
0

Мы создаем конвейер потоковой обработки для обработки сообщений Kinesis с использованием Flink v1.11 и характеристик времени события. При определении стратегии исходного водяного знака в официальной документации я наткнулся на две готовые стратегии водяных знаков; forBoundedOutOfOrderness и forMonotonousTimestamps. Но я не думаю, что они подходят для моего варианта использования, как я понимаю вышеизложенное. Вот подробности моего варианта использования:

Данные из входного потока: (содержащие данные с отметками времени для каждой минуты)

{11:00, Data1}
{11:01, Data2}
{11:00, Data3}
{11:00, Data4}
{11:01, Data5}
...

Теперь я хочу обработать окно (время события Tumbling: 1 мин) для 11:00-11:01, содержащее [Данные1, Данные3, Данные4], ровно через 20 секунд после прибытия первого события с отметкой времени 11:00. Точно так же следующее окно с 11:01 по 11:02, содержащее [Данные2, Данные5], должно быть выполнено через 20 секунд после появления первого события с отметкой времени 11:01. Возможна ли такая стратегия водяных знаков во Flink?

Источник

Ответы (1)

avatar
David Anderson
1 июля 2021 в 19:17
0

Вот как это реализовать:

В методе onEvent отслеживайте самую большую метку времени, замеченную до сих пор. И всякий раз, когда вы обновляете эту переменную, записывайте текущее системное время.

Затем, когда вызывается onPeriodicEmit (по умолчанию это будет вызываться каждые 200 мс), если с момента обновления текущей максимальной временной метки прошло 20 секунд, создается водяной знак, равный текущей максимальной временной метке плюс 1 секунда.

ekjot
3 июля 2021 в 12:42
0

Это замечательно. Спасибо за подробный ответ, Давид! Только один дополнительный вопрос, если наибольшая метка времени изменяется до первых 20 с предыдущей наибольшей метки времени (скажем, в текущее время = 11:01:00, наступает первое событие временной метки 11:00, а в текущее время = 11:01:10 , приходит первое событие с временной меткой события 11:01), водяной знак будет выдан через 20 секунд после второй по величине временной метки... верно? Нужно ли мне поддерживать карту maxTimestamps и текущих временных меток в том случае, когда водяной знак еще не выдается? или этот сценарий невозможен.

David Anderson
6 июля 2021 в 18:38
0

Да, если такое перекрытие возможно, вам может понадобиться использовать карту для обработки событий из разных временных диапазонов. Я предполагал более простой случай, когда события не будут демонстрировать такую ​​большую неупорядоченность.