Когда коммит фиксируется в KafkaListener.java в Spring Boot?

avatar
raikumardipak
8 августа 2021 в 21:04
542
1
2

Мой вопрос состоит из двух частей.

  1. В приведенной ниже реализации Springboot KafkaListener, когда смещение фиксируется для стратегии смещения auto-offset-reset: latest и enable-auto-commit: true? Сразу после получения сообщения потребителем или после завершения всего метода, реализующего KafkaListener?

KafkaConsumer.java

@KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public ResponseEntity<String> consume(String message) {
        log.info("Message recieved from Kafka topic {}", message); // offset committed HERE?
        KafkaResponse kafkaResponse = new Gson().fromJson(message, KafkaResponse.class);
        myBusinessService.processKafkaResponse(kafkaResponse);
        return new ResponseEntity<>("Successfully Received", HttpStatus.OK);// OR offset committed HERE?
    }
  1. В application.yml для следующих свойств, какие утверждения являются верными/ложными/какой правильный ответ?:
max-poll-records: 100
max-poll-interval-ms: 200000
enable-auto-commit: true 
auto-commit-interval: 3000
auto-offset-reset: latest
isolation-level: READ_UNCOMMITTED
fetch-max-bytes: 52428800
  • Между auto-offset-reset: latest и auto-commit-interval: 3000, если потребитель Kafka сломается до 3 секунд, то никакое смещение не будет зафиксировано для любой записи, обработанной в течение этих 3 секунд?
  • Между max-poll-interval-ms: 200000 и auto-commit-interval: 3000 потребитель Kafka наверняка будет опрашивать брокера после интервала в 200000 секунд, даже если потребитель не завершил фиксацию текущей партии смещений через 3000 секунд?<4385679090
  • Между комбинацией max-poll-records: 100 и fetch-max-bytes: 52428800 какая будет иметь приоритет, если 99-я запись превысит 52428800 байт?

Заранее спасибо!

Источник

Ответы (1)

avatar
Gary Russell
9 августа 2021 в 16:10
1

Лучше не использовать enable.auto.commit=true. Spring фиксирует смещения более детерминированным образом либо после обработки всех записей опроса (по умолчанию — AckMode.BATCH), либо после обработки каждой записи AckMode.RECORD.

enable.auto.commit не будет зафиксировано до следующего poll() и только в том случае, если auto.commit.interval прошло.

raikumardipak
10 августа 2021 в 14:32
0

Разве весь пакет не откатывается на 100 записей как таковых, когда max-poll-records=100, даже если я выбираю ручную фиксацию?

Gary Russell
10 августа 2021 в 15:07
1

Нет; если вы используете AckMode.MANUAL, любые успешные подтверждения, вызванные до сбоя, ставятся в очередь и фиксируются перед вызовом обработчика ошибок с оставшимися записями; поиск выполняется только для этих записей. Если вы используете AckMode.MANUAL_IMMEDIATE, смещения фиксируются немедленно, пока вы вызываете его в потоке прослушивателя.