Этот вопрос следует за моим сообщением о политике запросов/ответов и повторных попыток для прослушивателей Kafka, но в контексте транзакционных прослушивателей Kafka (поэтому текущая реализация аналогична предлагаемому решению). По сути, идея состоит в том, чтобы иметь возможность поддерживать полное управление ошибками, которое, в зависимости от типа исключения, либо повторяет X раз запись, либо отправляет ее в тему недоставленных писем для исключения, поднятого внутри прослушивателя Kafka, помеченного @Transactional. Когда я указываю параметр errorHandler в свой @KafkaListener, я вижу, что он проходит через мою логику в первый раз, но затем, после отправки в тему недоставленных писем (и возврата моего пользовательского ответа в случае @ SendTo), он откатывает транзакцию и повторяет попытку обработки моей записи в соответствии с периодом BackOff DefaultAfterRollbackProcessor. Можно ли каким-либо образом предотвратить эти повторные попытки в случае, если исключение было правильно обработано, а затем просто продолжить следующую транзакцию?
Вот мои различные обработчики, определенные в соответствии с решением по приведенной выше ссылке:
@Bean
public ErrorHandler errorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler) {
//set with retry policy higher than KafkaListenerErrorHandler
return new SeekToCurrentErrorHandler((data, thrownException) -> {
deadLetterQueueHandler.send(data, thrownException);
}, new FixedBackOff(15000, 20));
}
@Bean
public AfterRollbackProcessor<? ?> afterRollbackProcessor(MyDeadLetterQueueHandler deadLetterQueueHandler) {
//set with retry policy higher than KafkaListenerErrorHandler
final var afterRollbackProcessor = new DefaultAfterRollbackProcessor<Object, Object>(((data, thrownException) -> {
deadLetterQueueHandler.send(data, thrownException);
}, new FixedBackOff(15000, 20));
afterRollbackProcessor.setCommitRecovered(true);
return afterRollbackProcessor;
}
@Primary
KafkaListenerErrorHandler kafkaListenerErrorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler,
MyExceptionHandler exceptionHandler) {
return (message, exception) -> {
final var cause = (Exception) exception.getCause();
final var consumerRecord = message.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
if (shouldGoToDLT(cause)) {
sendToDeadLetterTopic(deadLetterQueueHandler, consumerRecord, cause);
return new CustomResponse(cause.getMessage());
// should end transaction rollback and go to next transaction
} else {
// retry 10 times before killing the app
var deliveryAttempt = message.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class);
if (deliveryAttempt > 10) {
exceptionHandler.handle(cause);
}
}
throw exception;
};
}
и журналы, которые я получаю в результате своего теста с использованием EmbeddedKafkaBroker и созданием исключения в прослушивателе @Transactional Kafka:
2021-07-01 17:12:34.791 INFO [,0beec62e5e3dbb97,0beec62e5e3dbb97] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-consumer-group.rollback-db-employee-topic.0, transactionalId=consumer-group.rollback-db-employee-topic.0] Aborting incomplete transaction
2021-07-01 17:12:34.812 ERROR [,0beec62e5e3dbb97,0beec62e5e3dbb97] 19210 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back
org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is rolled back; nested exception is org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:195)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2072)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2041)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2017)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:168)
... 11 common frames omitted
2021-07-01 17:12:34.918 INFO [,0beec62e5e3dbb97,0beec62e5e3dbb97] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-consumer-group-3, groupId=consumer-group] Seeking to offset 0 for partition rollback-db-employee-topic-0
2021-07-01 17:12:34.929 INFO [,0beec62e5e3dbb97,2d5e98ce0b91d04a] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-consumer-group.rollback-db-employee-topic.0, transactionalId=consumer-group.rollback-db-employee-topic.0] Aborting incomplete transaction
2021-07-01 17:12:34.931 ERROR [,0beec62e5e3dbb97,2d5e98ce0b91d04a] 19210 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back
org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is rolled back; nested exception is org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:195)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2072)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2041)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2017)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:168)
... 11 common frames omitted
2021-07-01 17:12:35.034 INFO [,0beec62e5e3dbb97,2d5e98ce0b91d04a] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-consumer-group-3, groupId=consumer-group] Seeking to offset 0 for partition rollback-db-employee-topic-0
2021-07-01 17:12:35.445 INFO [,0beec62e5e3dbb97,4ef5c58e90699a09] 19210 --- [ntainer#1-0-C-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-consumer-group.rollback-db-employee-topic.0, transactionalId=consumer-group.rollback-db-employee-topic.0] Aborting incomplete transaction
2021-07-01 17:12:35.448 ERROR [,0beec62e5e3dbb97,4ef5c58e90699a09] 19210 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back
org.springframework.transaction.HeuristicCompletionException: Heuristic completion: outcome state is rolled back; nested exception is org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:195)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2072)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2041)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2017)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:168)
... 11 common frames omitted
…
Спасибо за помощь.
EDIT: Вот мой слушатель:
@KafkaListener(topics = "user-topic", groupId = "consumer-group-1", errorHandler ="errorHandler")
@Transactional
public void onReceive(User command) {
// update database
userRepository.save(command);
switch (command.getName()) {
case "GOTODLT":
var volunteerArithmeticException = 7 / 0;
break;
case "SHOULDRETRY":
throw new IllegalStateException("Should be retried 10 times");
}
}