Способ подтвердить, что все Streams<Record> были успешно отправлены в kafka

avatar
Debo
9 августа 2021 в 02:46
42
0
0

Вот мой код загрузки Spring:Я использовал аннотацию Spring boot @Async

     @Async("concurrentTasks")
    
        public void fetchAndPost(){
        Stream<Sample> sampledata=sampleRepository.fetchSampleDatafromDb();
        sampledata.forEach(sample->{
        
        ListenableFuture<SendResult<String,Object>> future=kafkaTemplate.send(kafkaTopic,partitionKey,sample.toString());
        handleResponse(future,partitionKey,sample)
        });
        
        }
        
        public void handleResponse(Listenable<SendResult<String,Object>> future,String partitionkey,Sample sample){
        
        future.addCallback(new ListenableFutureCallBack<SendResult<String,Object>>(){
        
        @Override
        public void onSuccess(SendResult<String,Object>){
        log.info("Publised data successfully");
        
        }
        @Override
        public void onFailure(SendResult<String,Object>){
        log.info("Publised data failed");
        
        }
        
        
        });
        
        }


  

А вот и класс конфигурации

       @Configuration
        public class AsyncConfig {
            @Bean
            public TaskExecutor concurrentTasks() {
                ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                executor.setCorePoolSize(4);
                executor.setMaxPoolSize(4);
                executor.setThreadNamePrefix("default_task_executor_thread");
                executor.initialize();
                return executor;
            }
        }

Теперь, как узнать, что весь мой поток был опубликован в kafka. Потому что я должен уведомить клиента о том, что мой kafkaPosting завершен, чтобы он мог начать потреблять. Требование похоже на то, что мы должны уведомить клиента по электронной почте. Итак, как я могу это понять.

Я не могу использовать kafkaTemplate.send(kafkaTopic,partitionKey,sample.toString()).get(), так как это сильно замедляет мой процесс.

Источник

Ответы (0)