Обработка ошибок в транзакционных прослушивателях Kafka

avatar
Jeremy
1 июля 2021 в 18:31
851
1
0

Этот вопрос следует за моим сообщением о политике запросов/ответов и повторных попыток для прослушивателей Kafka, но в контексте транзакционных прослушивателей Kafka (поэтому текущая реализация аналогична предлагаемому решению). По сути, идея состоит в том, чтобы иметь возможность поддерживать полное управление ошибками, которое, в зависимости от типа исключения, либо повторяет X раз запись, либо отправляет ее в тему недоставленных писем для исключения, поднятого внутри прослушивателя Kafka, помеченного @Transactional. Когда я указываю параметр errorHandler в свой @KafkaListener, я вижу, что он проходит через мою логику в первый раз, но затем, после отправки в тему недоставленных писем (и возврата моего пользовательского ответа в случае @ SendTo), он откатывает транзакцию и повторяет попытку обработки моей записи в соответствии с периодом BackOff DefaultAfterRollbackProcessor. Можно ли каким-либо образом предотвратить эти повторные попытки в случае, если исключение было правильно обработано, а затем просто продолжить следующую транзакцию?

Вот мои различные обработчики, определенные в соответствии с решением по приведенной выше ссылке:

    @Bean
    public ErrorHandler errorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler) {
        //set with retry policy higher than KafkaListenerErrorHandler
        return new SeekToCurrentErrorHandler((data, thrownException) -> {
                deadLetterQueueHandler.send(data, thrownException);
        }, new FixedBackOff(15000, 20));
    }

    @Bean
    public AfterRollbackProcessor<?  ?> afterRollbackProcessor(MyDeadLetterQueueHandler deadLetterQueueHandler) {
        //set with retry policy higher than KafkaListenerErrorHandler
        final var afterRollbackProcessor = new DefaultAfterRollbackProcessor<Object, Object>(((data, thrownException) -> {
                deadLetterQueueHandler.send(data, thrownException);
        }, new FixedBackOff(15000, 20));
        afterRollbackProcessor.setCommitRecovered(true);
        return afterRollbackProcessor;
    }

    
    @Primary
    KafkaListenerErrorHandler kafkaListenerErrorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler,
                                                        MyExceptionHandler exceptionHandler) {
        return (message, exception) -> {
            final var cause = (Exception) exception.getCause();
            final var consumerRecord = message.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
            if (shouldGoToDLT(cause)) {
                sendToDeadLetterTopic(deadLetterQueueHandler, consumerRecord, cause);
                return new CustomResponse(cause.getMessage());
                // should end transaction rollback and go to next transaction
            } else {
                // retry 10 times before killing the app
                var deliveryAttempt = message.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class);
                if (deliveryAttempt > 10) {
                    exceptionHandler.handle(cause);
                }
            }
            throw exception;
        };
    }

и журналы, которые я получаю в результате своего теста с использованием EmbeddedKafkaBroker и созданием исключения в прослушивателе @Transactional Kafka:

2021-07-01 17:12:34.791  INFO [,0beec62e5e3dbb97,0beec62e5e3dbb97] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-consumer-group.rollback-db-employee-topic.0, transactionalId=consumer-group.rollback-db-employee-topic.0] Aborting incomplete transaction
2021-07-01 17:12:34.812 ERROR [,0beec62e5e3dbb97,0beec62e5e3dbb97] 19210 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is rolled back; nested exception is org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:195)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2072)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2041)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2017)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:168)
    ... 11 common frames omitted

2021-07-01 17:12:34.918  INFO [,0beec62e5e3dbb97,0beec62e5e3dbb97] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-consumer-group-3, groupId=consumer-group] Seeking to offset 0 for partition rollback-db-employee-topic-0
2021-07-01 17:12:34.929  INFO [,0beec62e5e3dbb97,2d5e98ce0b91d04a] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-consumer-group.rollback-db-employee-topic.0, transactionalId=consumer-group.rollback-db-employee-topic.0] Aborting incomplete transaction
2021-07-01 17:12:34.931 ERROR [,0beec62e5e3dbb97,2d5e98ce0b91d04a] 19210 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is rolled back; nested exception is org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:195)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2072)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2041)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2017)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:168)
    ... 11 common frames omitted

2021-07-01 17:12:35.034  INFO [,0beec62e5e3dbb97,2d5e98ce0b91d04a] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-consumer-group-3, groupId=consumer-group] Seeking to offset 0 for partition rollback-db-employee-topic-0
2021-07-01 17:12:35.445  INFO [,0beec62e5e3dbb97,4ef5c58e90699a09] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-consumer-group.rollback-db-employee-topic.0, transactionalId=consumer-group.rollback-db-employee-topic.0] Aborting incomplete transaction
2021-07-01 17:12:35.448 ERROR [,0beec62e5e3dbb97,4ef5c58e90699a09] 19210 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is rolled back; nested exception is org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:195)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2072)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2041)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2017)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:168)
    ... 11 common frames omitted

…

Спасибо за помощь.

EDIT: Вот мой слушатель:

    @KafkaListener(topics = "user-topic", groupId = "consumer-group-1", errorHandler ="errorHandler")
    @Transactional
    public void onReceive(User command) {
        // update database
        userRepository.save(command);

        switch (command.getName()) {
            case "GOTODLT":
                var volunteerArithmeticException = 7 / 0;
                break;
            case "SHOULDRETRY":
                throw new IllegalStateException("Should be retried 10 times");
        }
    }

Источник

Ответы (1)

avatar
Gary Russell
1 июля 2021 в 19:13
1

Непонятно, почему вы используете @Transactional, поскольку вы используете ChainedKafkaTransactionManager (который, кстати, устарел; см. javadocs родительского класса). Его можно использовать, если вы знаете об ограничениях.

Транзакции уже запущены менеджерами транзакций, поэтому аннотация не требуется.

Поскольку ваш метод прослушивателя заключен в перехватчик транзакций, транзакция откатывается до того, как будет вызван обработчик ошибок вашего прослушивателя.

Отсюда и сообщение Transaction silently rolled back because it has been marked as rollback-only.

Удалите аннотацию, и она должна работать так, как вы ожидаете.

Не следует одновременно настраивать STCEH и ARP. Первый выполняется внутри транзакции, второй — после отката.