Вопросы, помеченные тегом «apache-kafka»

Apache Kafka - это распределенная потоковая платформа, предназначенная для хранения и обработки потоков данных с высокой пропускной способностью.
avatar
haq
9 августа 2021 в 06:58
235
2

Kafka-Connect Cassandra Sink Connector не передает данные в Cassandra

Я создал файл Kafka Standalone.properties для установления соединения. Файл размещается по адресу home/kafka/config/connect-standalone.properties, как указано ниже:...
avatar
manavchawla
9 августа 2021 в 02:51
78
1

Получите 1 сообщение из каждой темы, на которую вы подписаны в Kafka - KafkaConsumer

Я использую Java Kafka Consumer API для создания KafkaConsumer следующим образом, чтобы подписаться на все темы с этим шаблоном регулярного выражения: Pattern topicPattern = Pattern.compile("mssql-.*"); KafkaConsumer<String, JsonNode> consumer = new...
avatar
Debo
9 августа 2021 в 02:46
35
0

Способ подтвердить, что все Streams<Record> были успешно отправлены в kafka

Вот мой код загрузки Spring:Я использовал аннотацию Spring boot @Async @Async("concurrentTasks") public void fetchAndPost(){ Stream<Sample> sampledata=sampleRepository.fetchSampleDatafromDb(); sampledata.forEach(sample->{ ...
avatar
kushaq flier
9 августа 2021 в 01:27
402
1

Соединитель приемника поиска Kafka Elastic показывает высокую загрузку ЦП

Я использую соединитель эластичного поискового приемника в распределенном режиме (2 экземпляра). С задачей 8 и от 20 до 25 тем, которые нужно погрузить в эластичный поиск. Даже если нет записей для приема, рабочий процесс Java показывает 100% загрузку ЦП с...
avatar
Amos Shapira
9 августа 2021 в 00:47
143
1

Лидер раздела Kafka -1 ТОЛЬКО через SSL

Я работаю над переносом всего нашего трафика Kafka через SSL. У нас есть два кластера в каждом регионе. Использование Kafka версии 2.7.0. Все регионы и все кластеры нормально работают через SSL, кроме одного кластера. Среди других инструментов я использую kafkacat для...
avatar
SpongeBob
8 августа 2021 в 21:15
181
1

«Выстрелить и забыть» или «Опубликовать» Подписаться

Я знаю, что Fire And Forget — это шаблон, который позволяет добавить сообщение в очередь сообщений, не дожидаясь подтверждения. Кроме того, я знаю, что Pub/Sub — это шаблон, который публикуют издатели, а подписчики потребляют данные. В чем разница между этими двумя...
avatar
raikumardipak
8 августа 2021 в 21:04
553
1

Когда коммит фиксируется в KafkaListener.java в Spring Boot?

Мой вопрос состоит из двух частей. В приведенной ниже реализации Springboot KafkaListener, когда смещение фиксируется для стратегии смещения auto-offset-reset: latest и enable-auto-commit: true? Сразу после получения сообщения потребителем или после завершения всего...
avatar
Mehmood ulhassan
8 августа 2021 в 20:12
27
1

Хотите запустить команду ccloud для создания темы с помощью python

Я хочу запустить эту команду ccloud create topic mytopic используя питон
avatar
mkk05
8 августа 2021 в 10:13
156
1

Apache Flink с использованием Java — проблема с производительностью

У нас есть приложение flink, написанное на Java и работающее на AWS Kinesis Data Analytics. Приложение считывает входной поток из AWS Managed Service Kafka (тема kafka 1), затем применяет бизнес-логику (некоторые вычисления) и, наконец, записывает выходные данные в...
avatar
jeff
8 августа 2021 в 06:42
439
1

Как производитель Kafka находит идентификатор схемы для записи

Я вижу, что в Kafka Producer нам нужно указать только URL-адрес реестра схемы, а не схему, которую я бы указал. Таким образом, при сериализации записей производитель решает, какую схему использовать. Поскольку реестр схем может содержать несколько...
avatar
dz902
8 августа 2021 в 06:32
275
1

Соединительный коннектор приемника Kafka S3 выдает `java.lang.NoClassDefFoundError: com/google/common/base/Preconditions` при использовании формата Parquet

При использовании соединителя приемника Confluent S3 возникает следующая ошибка: [2021-08-08 02:25:15,588] ERROR WorkerSinkTask{id=s3-test-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover unt il manually restarted. Error:...
avatar
FRTDS
7 августа 2021 в 17:47
662
2

Реестр Confluent Schema не может подключиться к контейнеру Kafka

Я пытаюсь запустить службу Kafka с помощью Zookeper, Kafdrop и Schema Registry. Я заставил ее работать, установив Kafka, Zookeper и Kadrop в один контейнер, а затем установив Confluent Schema Registry в другом (со своей собственной Kafka, Zookeeper, Ksql Server и Rest...
avatar
Lucas Andrade
7 августа 2021 в 16:42
39
1

Деактивировать отправку и получение сообщений Kafka для тестирования в проекте загрузки Spring

Я хочу провести интеграционное тестирование в проекте весенней загрузки, где используется kafka. Для этих тестов я не хочу, чтобы какие-либо сообщения отправлялись или принимались от kafka. Есть ли способ настроить это?
avatar
VV-
7 августа 2021 в 13:07
88
1

Confluent Operator добавить дополнительный коннектор

У меня в OpenShift настроен конфлюентный оператор, и этот образ confluentinc/cp-int-container-operator:6.1.0.0 не включает все коннекторы. Как включить любой дополнительный соединитель в yaml, например, мне нужно включить соединение Debezium для mysql. confluent-hub...
avatar
riccardo.cardin
7 августа 2021 в 12:26
155
1

Зио-Кафка: Создание сообщений в тему

Я пытаюсь создать несколько сообщений в тему Kafka, используя библиотеку zio-kafka, версия 0.15.0. Очевидно, что мое понимание экосистемы ZIO неоптимально, потому что я не могу создать простое сообщение. Моя программа следующая: object KafkaProducerExample extends...
avatar
takeshi
7 августа 2021 в 08:20
134
1

UnixTime сохраняется как int, как запрашивать как Date. Кассандра CQLSH

Я строю конвейер: sensorData - брокер MQTT - Kafka - Cassandra. Полезная нагрузка передается как JSON, а при сохранении в Cassandra дата сохраняется как int. Я не могу получить удобочитаемую дату при запросе CQLSH. CREATE TABLE sensordata.mqttsensordata ( sensor...
avatar
user3276247
7 августа 2021 в 03:10
165
1

Измените хранилище состояний для Kafka Streams, чтобы использовать MongoDB

Хранилище состояний по умолчанию — rockdb. Я также мог найти хранилища состояний в памяти. Однако мне нужно выяснить, можно ли настроить потоки kafka для использования mongodb в качестве хранилища состояний, а также могу ли я сам определить эти коллекции.
avatar
Lucas Andrade
6 августа 2021 в 18:41
180
1

Потребитель Kafka обрабатывает все сообщения при запуске

Я новичок в Kafka и разрабатываю личный проект с несколькими сервисами, и связь между ними осуществляется через Kafka, и я использую Confluent для удаленного размещения Kafka. Все работает нормально, но когда я запускаю сервер, он пытается обработать все старые...
avatar
Sergii
6 августа 2021 в 14:40
400
1

Kafkajs - получить статистику (лаг)

В нашем приложении nest.js мы используем клиент kafkajs для kafka. Нам нужно получить статистику мониторинга шансов. Одна из метрик: lag. Попытка выяснить, дает ли kafkajs что-либо и ничего интересного. (Самое интересное в полезной нагрузке: timestamp, offset,...
avatar
user3817206
6 августа 2021 в 13:22
96
1

Spring Kafka Event после запуска всех слушателей

Я использую Spring-Kakfa для подключения кластера kakfa, и у меня есть требование выполнить часть кода после того, как все разделы тем всех тем будут назначены. возьмите следующий код, например @Component public class MyKafkaMessageListener1 { ...