|
I'm experiencing a similar problem setting up a CDC replication pipeline with the Debezium and Kafka Sink Connector but I’am having problems with Update Operations.
In on hand, I have a MongoDB source database configured as a single node replica set. Connected to the source DB, I have the [Debezium source connector](https://debezium.io/documentation/reference/stable/connectors/mongodb.html) that is streaming all CDC events to a Kafka Topic.
On the other hand, I have a MongoDb acting as a sink database. The sink databased is feed by the [MongoDb Sink Connector](https://www.mongodb.com/docs/kafka-connector/current/sink-connector/) with the [Debezium MongoDB CDC Handler](https://www.mongodb.com/docs/kafka-connector/v1.2/sink-connector/fundamentals/change-data-capture/).
The source data is properly replicated into the sink only in insertion and deletion operations. If I try to update a document in the source collection, the sink connector will raise the following exception for this CDC event:
{}DEBEZIUM CDC UPDATE EVENT{}
//
|
{
|
"schema": {
|
"type": "struct",
|
"fields": [
|
|
{ "type": "string", "optional": true, "name": "io.debezium.data.Json", "version": 1, "field": "before" }
|
,
|
|
{ "type": "string", "optional": true, "name": "io.debezium.data.Json", "version": 1, "field": "after" }
|
,
|
|
{ "type": "string", "optional": true, "name": "io.debezium.data.Json", "version": 1, "field": "patch" }
|
,
|
|
{ "type": "string", "optional": true, "name": "io.debezium.data.Json", "version": 1, "field": "filter" }
|
,
|
{
|
"type": "struct",
|
"fields": [
|
{
|
"type": "array",
|
"items":
|
{ "type": "string", "optional": false }
|
,
|
"optional": true,
|
"field": "removedFields"
|
},
|
|
{ "type": "string", "optional": true, "name": "io.debezium.data.Json", "version": 1, "field": "updatedFields" }
|
,
|
{
|
"type": "array",
|
"items": {
|
"type": "struct",
|
"fields": [
|
|
{ "type": "string", "optional": false, "field": "field" }
|
,
|
|
{ "type": "int32", "optional": false, "field": "size" }
|
],
|
"optional": false,
|
"name": "io.debezium.connector.mongodb.changestream.truncatedarray",
|
"version": 1
|
},
|
"optional": true,
|
"field": "truncatedArrays"
|
}
|
],
|
"optional": true,
|
"name": "io.debezium.connector.mongodb.changestream.updatedescription",
|
"version": 1,
|
"field": "updateDescription"
|
},
|
{
|
"type": "struct",
|
"fields": [
|
|
{ "type": "string", "optional": false, "field": "version" }
|
,
|
|
{ "type": "string", "optional": false, "field": "connector" }
|
,
|
|
{ "type": "string", "optional": false, "field": "name" }
|
,
|
|
{ "type": "int64", "optional": false, "field": "ts_ms" }
|
,
|
{
|
"type": "string",
|
"optional": true,
|
"name": "io.debezium.data.Enum",
|
"version": 1,
|
"parameters":
|
{ "allowed": "true,last,false,incremental" }
|
,
|
"default": "false",
|
"field": "snapshot"
|
},
|
|
{ "type": "string", "optional": false, "field": "db" }
|
,
|
|
{ "type": "string", "optional": true, "field": "sequence" }
|
,
|
|
{ "type": "string", "optional": false, "field": "rs" }
|
,
|
|
{ "type": "string", "optional": false, "field": "collection" }
|
,
|
|
{ "type": "int32", "optional": false, "field": "ord" }
|
,
|
|
{ "type": "string", "optional": true, "field": "lsid" }
|
,
|
|
{ "type": "int64", "optional": true, "field": "txnNumber" }
|
],
|
"optional": false,
|
"name": "io.debezium.connector.mongo.Source",
|
"field": "source"
|
},
|
|
{ "type": "string", "optional": true, "field": "op" }
|
,
|
|
{ "type": "int64", "optional": true, "field": "ts_ms" }
|
,
|
{
|
"type": "struct",
|
"fields": [
|
|
{ "type": "string", "optional": false, "field": "id" }
|
,
|
|
{ "type": "int64", "optional": false, "field": "total_order" }
|
,
|
|
{ "type": "int64", "optional": false, "field": "data_collection_order" }
|
],
|
"optional": true,
|
"name": "event.block",
|
"version": 1,
|
"field": "transaction"
|
}
|
],
|
"optional": false,
|
"name": "src.metrics.customers.Envelope"
|
},
|
"payload": {
|
"before": null,
|
"after": "{\"_id\":
|
{\"$numberLong\": \"1001\"}
|
,\"first_name\": \"Sallyddf\",\"last_name\": \"Thomas\",\"email\": \"sally.thomas@acme.com\"}",
|
"patch": null,
|
"filter": null,
|
"updateDescription":
|
{ "removedFields": null, "updatedFields": "\{\"first_name\": \"Sallyddf\"}
|
",
|
"truncatedArrays": null
|
},
|
"source":
|
{ "version": "2.0.0.Final", "connector": "mongodb", "name": "src", "ts_ms": 1669244642000, "snapshot": "false", "db": "metrics", "sequence": null, "rs": "rs0", "collection": "customers", "ord": 2, "lsid": null, "txnNumber": null }
|
,
|
"op": "u",
|
"ts_ms": 1669244642381,
|
"transaction": null
|
}
|
}
|
{}Sink Connector Exception:{}
ERROR Unable to process record SinkRecord{kafkaOffset=4, timestampType=CreateTime} ConnectRecord{topic='src.metrics.customers', kafkaPartition=0, key=
|
{id=1001}
|
, keySchema=null, value=Struct{after={"_id":
|
{"$numberLong": "1001"}
|
,"first_name": "Sallyddf","last_name": "Thomas","email": "sally.thomas@acme.com"},updateDescription=Struct{updatedFields={"first_name": "Sallyddf"}},source=Struct{version=2.0.0.Final,connector=mongodb,name=src,ts_ms=1669244642000,snapshot=false,db=metrics,rs=rs0,collection=customers,ord=2},op=u,ts_ms=1669244642381}, valueSchema=Schema{src.metrics.customers.Envelope:STRUCT}, timestamp=1669244642856, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
|
metrics-sink-connect | org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
|
metrics-sink-connect | at java.base/java.util.Optional.flatMap(Optional.java:294)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
|
metrics-sink-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
|
metrics-sink-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
|
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
|
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
|
metrics-sink-connect | at java.base/java.lang.Thread.run(Thread.java:829)
|
metrics-sink-connect | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
|
metrics-sink-connect | at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
|
metrics-sink-connect | at org.bson.BsonValue.asString(BsonValue.java:69)
|
metrics-sink-connect | at org.bson.BsonDocument.getString(BsonDocument.java:252)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
|
metrics-sink-connect | ... 22 more
|
metrics-sink-connect | [2022-11-23 23:04:02,876] ERROR WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Value expected to be of type STRING is of unexpected type NULL (org.apache.kafka.connect.runtime.WorkerSinkTask)
|
metrics-sink-connect | org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
|
metrics-sink-connect | at java.base/java.util.Optional.flatMap(Optional.java:294)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
|
metrics-sink-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
|
metrics-sink-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
|
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
|
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
|
metrics-sink-connect | at java.base/java.lang.Thread.run(Thread.java:829)
|
metrics-sink-connect | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
|
metrics-sink-connect | at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
|
metrics-sink-connect | at org.bson.BsonValue.asString(BsonValue.java:69)
|
metrics-sink-connect | at org.bson.BsonDocument.getString(BsonDocument.java:252)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
|
metrics-sink-connect | ... 22 more
|
metrics-sink-connect | [2022-11-23 23:04:02,878] ERROR WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
|
metrics-sink-connect | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
|
metrics-sink-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
|
metrics-sink-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
|
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
|
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
|
metrics-sink-connect | at java.base/java.lang.Thread.run(Thread.java:829)
|
metrics-sink-connect | Caused by: org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
|
metrics-sink-connect | at java.base/java.util.Optional.flatMap(Optional.java:294)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
|
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
|
metrics-sink-connect | ... 10 more
|
metrics-sink-connect | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
|
metrics-sink-connect | at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
|
metrics-sink-connect | at org.bson.BsonValue.asString(BsonValue.java:69)
|
metrics-sink-connect | at org.bson.BsonDocument.getString(BsonDocument.java:252)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
|
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
|
metrics-sink-connect | ... 22 more
|
I followed all the examples and documentation from Debezium and MondoDb Sink Connector and I still have no clue why this is happening.
Please find below the dockerfiles and my configurations:
{}Debezium Sink Connector Dockefile{}
FROM quay.io/debezium/connect:2.0
|
ENV KAFKA_CONNECT_MONGODB_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-mongodb
|
USER root
|
RUN microdnf -y install git maven java-11-openjdk-devel && microdnf clean all
|
USER kafka
|
|
|
RUN mkdir -p $KAFKA_CONNECT_MONGODB_DIR && cd $KAFKA_CONNECT_MONGODB_DIR && \
|
git clone https://github.com/hpgrahsl/kafka-connect-mongodb.git && \
|
cd kafka-connect-mongodb && \
|
git fetch --tags && \
|
git checkout tags/v1.2.0 && \
|
mvn clean package -DskipTests=true -DskipITs=true && \
|
mv target/kafka-connect-mongodb/kafka-connect-mongodb-1.2.0-jar-with-dependencies.jar $KAFKA_CONNECT_MONGODB_DIR && \
|
cd .. && rm -rf $KAFKA_CONNECT_MONGODB_DIR/kafka-connect-mongodb
|
Please find below the dockerfiles and my configurations:
{}MongoDB Kafka Sink Connector Dockerfile{}
FROM confluentinc/cp-kafka-connect:7.2.2
|
RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0
|
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
|
{}Debezium Source Connector Configuration{}
{
|
"name": "metrics",
|
"config":
|
{ "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.name": "metrics-src", "mongodb.user": "admin", "mongodb.password": "admin", "mongodb.authsource": "admin", "mongodb.hosts": "rs0/metrics-src:27017", "topic.prefix": "src", "database.include.list": "metrics" }
|
}
|
{}MongoDb Sink Configuration with CDC Handler{}
{
|
"name": "metrics",
|
"config":
|
{ "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler", "connection.uri": "mongodb://metrics-sink:27017/metrics", "database": "metrics", "collection": "metrics", "topics": "src.metrics.customers" }
|
}
|
{}Docker Compose File{}
version: '3.4'
|
services:
|
zookeeper:
|
image: confluentinc/cp-zookeeper:7.0.1
|
container_name: zookeeper
|
restart: always
|
networks:
|
- sync-network
|
environment:
|
ZOOKEEPER_CLIENT_PORT: 2181
|
ZOOKEEPER_TICK_TIME: 2000
|
ZOO_4LW_COMMANDS_WHITELIST: "*"
|
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=ruok"
|
healthcheck:
|
test: nc -z localhost 2181 || exit -1
|
interval: 10s
|
timeout: 5s
|
retries: 3
|
start_period: 10s
|
extra_hosts:
|
- "moby:127.0.0.1"
|
broker:
|
image: confluentinc/cp-kafka:7.0.1
|
container_name: broker
|
restart: always
|
networks:
|
- sync-network
|
ports:
|
- "9092:9092"
|
- "39092:39092"
|
depends_on:
|
zookeeper:
|
condition: service_healthy
|
environment:
|
KAFKA_BROKER_ID: 1
|
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
|
KAFKA_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://broker:19092,EXTERNAL_LISTENER://0.0.0.0:39092
|
KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://localhost:19092,EXTERNAL_LISTENER://150.230.85.73:39092
|
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,HOST_LISTENER:PLAINTEXT,EXTERNAL_LISTENER:PLAINTEXT
|
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
|
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
extra_hosts:
|
- "moby:127.0.0.1"
|
healthcheck:
|
test: echo "ruok" | timeout 2 nc -w 2 zookeeper 2181 | grep imok
|
interval: 10s
|
timeout: 5s
|
retries: 3
|
kafdrop:
|
image: obsidiandynamics/kafdrop:latest
|
container_name: kafdrop
|
# network_mode: host
|
ports:
|
- 9000:9000
|
networks:
|
- sync-network
|
depends_on:
|
broker:
|
condition: service_healthy
|
environment:
|
KAFKA_BROKERCONNECT: broker:9092
|
metrics-src:
|
image: mongo:5.0.5
|
hostname: metrics-src
|
restart: always
|
container_name: metrics-src
|
ports:
|
- 27040:27017
|
networks:
|
- sync-network
|
environment:
|
MONGO_INITDB_DATABASE: metrics
|
volumes:
|
- ./scripts:/scripts
|
healthcheck:
|
test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
|
interval: 10s
|
start_period: 30s
|
command: --replSet rs0 --bind_ip_all
|
metrics-sink:
|
image: mongo:5.0.5
|
hostname: metrics-sink
|
restart: always
|
container_name: metrics-sink
|
ports:
|
- 27020:27017
|
networks:
|
- sync-network
|
environment:
|
MONGO_INITDB_DATABASE: metrics
|
volumes:
|
- ./scripts:/scripts
|
healthcheck:
|
test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
|
interval: 10s
|
start_period: 30s
|
command: --replSet rs0 --bind_ip_all
|
metrics-src-connect:
|
image: quay.io/debezium/connect:2.0
|
container_name: metrics-connect
|
ports:
|
- 8083:8083
|
links:
|
- broker
|
- metrics-src
|
networks:
|
- sync-network
|
volumes:
|
- kafka-src-config:/kafka/config
|
environment:
|
- BOOTSTRAP_SERVERS=broker:9092
|
- REST_HOST_NAME=0.0.0.0
|
- GROUP_ID=1
|
- CONFIG_STORAGE_TOPIC=metrics_src_connect_configs
|
- OFFSET_STORAGE_TOPIC=metrics_src_connect_offsets
|
- STATUS_STORAGE_TOPIC=metrics_src_connect_status
|
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
|
- CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
|
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true
|
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
|
# container with mongo kafka plugins
|
metrics-sink-connect:
|
image: confluentinc/cp-kafka-connect-base:7.2.2
|
build:
|
context: ./mongodb-kafka-connect
|
ports:
|
- "8084:8083"
|
hostname: metrics-sink-connect
|
container_name: metrics-sink-connect
|
depends_on:
|
- zookeeper
|
- broker
|
networks:
|
- sync-network
|
volumes:
|
- kafka-sink-config:/kafka/config
|
environment:
|
KAFKA_JMX_PORT: 35000
|
KAFKA_JMX_HOSTNAME: localhost
|
CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
|
CONNECT_REST_ADVERTISED_HOST_NAME: metrics-sink-connect
|
CONNECT_REST_PORT: 8083
|
CONNECT_GROUP_ID: connect-cluster-group
|
CONNECT_CONFIG_STORAGE_TOPIC: metrics_sink_connect_configs
|
CONNECT_OFFSET_STORAGE_TOPIC: metrics_sink_connect_offsets
|
CONNECT_STATUS_STORAGE_TOPIC: metrics_sink_connect_status
|
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
|
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
|
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
|
CONNECT_METADATA_MAX_AGE_MS: 180000
|
CONNECT_CONNECTIONS_MAX_IDLE_MS: 180000
|
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
|
CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
|
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
|
CONNECT_AUTO_CREATE_TOPICS_ENABLE: "true"
|
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
|
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
|
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
|
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
|
networks:
|
sync-network:
|
driver: bridge
|
volumes:
|
kafka-sink-config:
|
driver: local
|
driver_opts:
|
type: none
|
o: bind
|
device: ./kafka/sink-config
|
kafka-src-config:
|
driver: local
|
driver_opts:
|
type: none
|
o: bind
|
device: ./kafka/src-config
|
|