728x90

org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of KafkaSchemaHistory; check the logs for details\n\tat io.debezium.storage.kafka.history.KafkaSchemaHistory.configure(KafkaSchemaHistory.java:208)\n\tat io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.getSchemaHistory(HistorizedRelationalDatabaseConnectorConfig.java:137)\n\tat io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:50)\n\tat io.debezium.connector.binlog.BinlogDatabaseSchema.<init>(BinlogDatabaseSchema.java:79)\n\tat io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:41)\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:99)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:251)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:278)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doStart(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:78)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\

 


문제의 Debezium-DB 연결 명령

curl -X POST http://localhost:8083/connectors \

-H "Content-Type: application/json" \

-d '{

  "name": "{커넥터_이름}",

  "config": {

    "connector.class": "io.debezium.connector.mysql.MySqlConnector",

    "database.hostname": "{MySQL_호스트_이름_or_컨테이너_이름}",

    "database.port": "{MySQL_포트}",

    "database.user": "{MySQL_사용자명}",

    "database.password": "{MySQL_비밀번호}",

    "database.server.id": "{MySQL_복제_서버_ID(고유숫자)}",

    "database.include.list": "{감지할_DB명}",

    "topic.prefix": "{Kafka_토픽_프리픽스}",     

    "database.server.name": "{Kafka_토픽_서버명}",

 

    "database.history.kafka.bootstrap.servers": "{Kafka_호스트:포트}",

    "database.history.kafka.topic": "{Kafka_스키마_히스토리_토픽명}",

    

    "include.schema.changes": "true"

  }

}'

 

여기에서 에러가 났다.

이유는 Debezium 버전문제였다. 1.x 버전은 Kafka 스키마 히스토리 토픽명의 key값으로 database.history.kafka.topic 으로 해야하고 2.x 버전은 schema.history.internal.kafka.topic 으로 해야했다. 그래서 

 

curl -X POST http://localhost:8083/connectors \

-H "Content-Type: application/json" \

-d '{

  "name": "{커넥터_이름}",

  "config": {

    "connector.class": "io.debezium.connector.mysql.MySqlConnector",

    "database.hostname": "{MySQL_호스트_이름_or_컨테이너_이름}",

    "database.port": "{MySQL_포트}",

    "database.user": "{MySQL_사용자명}",

    "database.password": "{MySQL_비밀번호}",

    "database.server.id": "{MySQL_복제_서버_ID(고유숫자)}",

    "database.include.list": "{감지할_DB명}",

    "topic.prefix": "{Kafka_토픽_프리픽스}",     

    "database.server.name": "{Kafka_토픽_서버명}",

 

    "database.history.kafka.bootstrap.servers": "{Kafka_호스트:포트}",

    "schema.history.internal.kafka.topic": "{Kafka_스키마_히스토리_토픽명}",

    

    "include.schema.changes": "true"

  }

}'

 

이렇게 하니까 잘 된다.

+ Recent posts