Вот мой код загрузки Spring:Я использовал аннотацию Spring boot @Async
@Async("concurrentTasks")
public void fetchAndPost(){
Stream<Sample> sampledata=sampleRepository.fetchSampleDatafromDb();
sampledata.forEach(sample->{
ListenableFuture<SendResult<String,Object>> future=kafkaTemplate.send(kafkaTopic,partitionKey,sample.toString());
handleResponse(future,partitionKey,sample)
});
}
public void handleResponse(Listenable<SendResult<String,Object>> future,String partitionkey,Sample sample){
future.addCallback(new ListenableFutureCallBack<SendResult<String,Object>>(){
@Override
public void onSuccess(SendResult<String,Object>){
log.info("Publised data successfully");
}
@Override
public void onFailure(SendResult<String,Object>){
log.info("Publised data failed");
}
});
}
А вот и класс конфигурации
@Configuration
public class AsyncConfig {
@Bean
public TaskExecutor concurrentTasks() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.initialize();
return executor;
}
}
Теперь, как узнать, что весь мой поток был опубликован в kafka. Потому что я должен уведомить клиента о том, что мой kafkaPosting завершен, чтобы он мог начать потреблять. Требование похоже на то, что мы должны уведомить клиента по электронной почте. Итак, как я могу это понять.
Я не могу использовать kafkaTemplate.send(kafkaTopic,partitionKey,sample.toString()).get(), так как это сильно замедляет мой процесс.