Kafka-Connect Cassandra Sink Connector не передает данные в Cassandra

avatar
haq
9 августа 2021 в 06:58
235
2
0

Я создал файл Kafka Standalone.properties для установления соединения. Файл размещается по адресу home/kafka/config/connect-standalone.properties, как указано ниже:

.
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/home/kafka-connect-cassandra-sink-1.4.0/kafka-connect-cassandra-sink-1.4.0.jar

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


#listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092

На втором этапе я добавил файл kafka-connect-cassandra-sink-1.4.0. Файл размещен по пути home/kafka-connect-cassandra-sink-1.4.0 , файл приведен ниже:

name=users-sink
connector.class=com.datastax.oss.kafka.sink.CassandraSinkConnector
tasks.max=10

loadBalancing.localDc=datacenter1
contactPoints=localhost
port=9042

username=...
password=...

topics=demo
topic.demo.demo.users.mapping=lastname=value.lastname, firstname=value.firstname, email=value.email

На моем ПК уже запущен Zookeeper, и я также запустил Kafka с помощью следующей команды bin/kafka-server-start.sh config/connect-standalone.properties

Теперь для подключения kafka к коннектору стока я использовал следующую команду sudo kafka/config/connect-standalone.properties kafka-connect-cassandra-sink-1.4.0/conf/cassandra-sink-standalone.properties &> standalone-mode.log &

Мой файл standalone-mode.log пуст (я предполагаю, что это означает отсутствие ошибок, потому что в предыдущих попытках в этом файле журнала были ошибки, которые я устранил).

Теперь для загрузки текстового файла через соединитель приемника kafka в cassandra я использовал следующую команду cat Desktop/users.txt | kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:"; sleep 10;

Опять нет ошибок на терминале. Но теперь проблема в том, что когда я пишу запрос на просмотр данных в базе данных cassandra, таблица пуста:

image showing cqlsh quering showing nothing

файл user.txt приведен ниже:

Pruitt:{"lastname":"Pruitt", "firstname":"Allie", "email":"allie@example.com"}
Krause:{"lastname":"Krause", "firstname":"Duncan", "email":"duncan@example.com"}
Chase:{"lastname":"Chase", "firstname":"Juana", "email":"juana@example.com"}
Estrada:{"lastname":"Estrada", "firstname":"Edward", "email":"edward@example.com"}
Singleton:{"lastname":"Singleton", "firstname":"Marie", "email":"Marie@example.com"}
Poole:{"lastname":"Poole", "firstname":"Olivia", "email":"olivia@example.com"}
Marks:{"lastname":"Marks", "firstname":"Timothy", "email":"timothy@example.com"}
Suarez:{"lastname":"Suarez", "firstname":"Claud", "email":"claud@example.com"}
Sloan:{"lastname":"Sloan", "firstname":"Eloy", "email":"eloy@example.com"}
Rodriguez:{"lastname":"Rodriguez", "firstname":"Gale", "email":"gale@example.com"}
Bautista:{"lastname":"Bautista", "firstname":"Constance", "email":"Constance@example.com"}
Mcintyre:{"lastname":"Mcintyre", "firstname":"Donte", "email":"donte@example.com"}
Lang:{"lastname":"Lang", "firstname":"Willa", "email":"willa@example.com"}
Richmond:{"lastname":"Richmond", "firstname":"Dionne", "email":"dionne@example.com"}
Источник
OneCricketeer
9 августа 2021 в 09:52
0

Вы должны показать свой фактический файл users.txt, также не используйте &> standalone-mode.log & во время отладки, чтобы вы могли сразу увидеть журналы

haq
10 августа 2021 в 07:36
1

@OneCricketeer Я загрузил файл users.txt выше и после удаления &> standalone-mode.log и получил следующую ошибку key.converter=org.apache.kafka.connect.storage.StringConverter: команда не найдена kafka/config/connect -standalone.properties: строка 11: offset.flush.interval.ms=10000: команда не найдена kafka/config/connect-standalone.properties: строка 13: plugin.path=/kafka-connect-cassandra-sink-1.4.0 /kafka-connect-cassandra-sink-1.4.0.jar: Нет такого файла или каталога........ Все утверждения в connect-standalone.properties не найдены. Я перепроверил, все пути верны.

OneCricketeer
10 августа 2021 в 12:34
0

Пути не при чем. Смотрите мой ответ ниже

Ответы (2)

avatar
OneCricketeer
10 августа 2021 в 12:33
1

запустил Кафку с помощью следующей команды bin/kafka-server-start.sh config/connect-standalone.properties

Вы не запускаете брокеры со свойствами Connect, вам нужно использовать server.properties

Кроме того, вы проверили, действительно ли работает Kafka, создав темы и другие задачи, упомянутые в официальном кратком руководстве?

файл standalone-mode.log пуст (я предполагаю, что это означает отсутствие ошибок

Не обязательно. Вы не захватываете stderr с помощью этой команды. например 2>&1

теперь подключив kafka к коннектору стока, я использовал следующую команду sudo kafka/config/connect-standalone.properties kafka-connect-cassandra-sink-1.4.0/conf/...

получил следующую ошибку: command not found kafka/config/connect-standalone.properties...

Файлы свойств не исполняемые. Вам нужно запустить connect-standalone.sh, который принимает эти два файла свойств подключения в качестве аргументов

.
haq
11 августа 2021 в 07:25
1

Спасибо за это понимание. Я не знал, что должен запускать server.properties и standalone. properties одновременно. Да, производитель и потребитель кафки работают нормально.

avatar
Erick Ramirez
9 августа 2021 в 07:22
1

В файле cassandra-sink-standalone.properties необходимо указать поставщика проверки подлинности, иначе по умолчанию будет None, что означает, что соединитель не будет выполнять проверку подлинности в кластере.

Исходя из того, что вы опубликовали, похоже, что вы используете поставщика простой аутентификации, поэтому установите следующее:

auth.provider=PLAIN
auth.username=username
auth.password=S0mePa$$word

Ура!

haq
9 августа 2021 в 07:40
0

Я добавил вышеуказанное в cassandra-sink-standalone.properties и перезапустил сервер kafka и снова выполнил команды sudo kafka/config/connect-standalone.properties kafka-connect-cassandra-sink-1.4.0/conf/cassandra-sink-standalone.properties &> standalone-mode.log и cat Desktop/users.txt | kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:"; sleep 10;, и ошибки нет, но таблица по-прежнему пуста.

Erick Ramirez
10 августа 2021 в 11:01
0

Кассандра действительно слушает клиентов на localhost? Что вы установили для rpc_address?

haq
10 августа 2021 в 11:22
0

Насчет rpc_address не знаю, подскажите, пожалуйста, в каком конфигурационном файле это проверить?

Erick Ramirez
10 августа 2021 в 11:33
0

Он находится в conf/cassandra.yaml вашего кластера. Ваше здоровье!

haq
11 августа 2021 в 06:13
0

Да, я нашел этот файл, и для rpc установлено значение rpc_address: localhost.