[KAFKA-299] MongoDB Sink Connector can not support Debezium update event Created: 07/Mar/22  Updated: 11/Jul/23  Resolved: 10/Jul/23

Status: Closed
Project: Kafka Connector
Component/s: CDC
Affects Version/s: None
Fix Version/s: None

Type: Bug Priority: Minor - P4
Reporter: Caspar Chang Assignee: Ross Lawley
Resolution: Won't Fix Votes: 0
Labels: internal-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
is related to SERVER-47726 Log delta-style oplog entries from pi... Closed
Quarter: FY24Q2

 Description   

 Hi, I build an environment on my own to test Debezium MongoDB Connector with MongoDB Sink Connector

 
I found that insert and delete are working well, but update will have error and terminate the sink task like below.

Maybe its a version mismatch, can I know which version is stable on our document ?

[2022-03-07 15:50:05,144] ERROR [mongo-sink-deb|task-0] Failed to put into the sink the following records: [SinkRecord\{kafkaOffset=1, timestampType=CreateTime} ConnectRecord\{topic='ktb.deb.source', kafkaPartition=0, key={id={"$oid": "62262999eb952841bb6b6a0e"}}, keySchema=null, value=\{patch={"$v": 2,"diff": {"i": {"age": "111"}}}, filter=\{"_id": {"$oid": "62262999eb952841bb6b6a0e"}}, op=u, after=null, source=\{ord=3, rs=rs777, h=null, stxnid=679518be-7538-31f7-ad33-b6ead6fe2bf6:1, collection=source, version=1.6.4.Final, sequence=null, connector=mongodb, name=ktb, tord=null, ts_ms=1646668204000, snapshot=false, db=deb}, ts_ms=1646668204960, transaction=null}, valueSchema=null, timestamp=1646668205135, headers=ConnectHeaders(headers=)}] (com.mongodb.kafka.connect.sink.MongoSinkTask:181)
java.lang.IllegalArgumentException: Invalid BSON field name diff
 



 Comments   
Comment by Ross Lawley [ 11/Jul/23 ]

To clarify since 1.8.0 the Kafka connector has supported change stream based change events - which are now default in Debezium.

Comment by Ross Lawley [ 10/Jul/23 ]

Marking as "Won't Fix" the Debezium patch version was in some flux but has now stabilized.

Debezium 0.10 introduced a few breaking
changes to the structure of the source block in order to unify the exposed structure across all the connectors.
By setting this option to v1 the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.

KAFKA-379 will add support to the latest / unified format of Debezium events.

Comment by Paulo Henrique Favero Pereira [ 25/Nov/22 ]

I'm experiencing a similar problem setting up a CDC replication pipeline with the Debezium and Kafka Sink Connector but I’am having problems with Update Operations.

In on hand, I have a MongoDB source database configured as a single node replica set. Connected to the source DB, I have the [Debezium source connector](https://debezium.io/documentation/reference/stable/connectors/mongodb.html) that is streaming all CDC events to a Kafka Topic.

On the other hand, I have a MongoDb acting as a sink database. The sink databased is feed by the [MongoDb Sink Connector](https://www.mongodb.com/docs/kafka-connector/current/sink-connector/) with the [Debezium MongoDB CDC Handler](https://www.mongodb.com/docs/kafka-connector/v1.2/sink-connector/fundamentals/change-data-capture/).

The source data is properly replicated into the sink only in insertion and deletion operations. If I try to update a document in the source collection, the sink connector will raise the following exception for this CDC event:

{}DEBEZIUM CDC UPDATE EVENT{}

 

//
{
   "schema": {
      "type": "struct",
      "fields": [
         
{             "type": "string",             "optional": true,             "name": "io.debezium.data.Json",             "version": 1,             "field": "before"          }
,
         
{             "type": "string",             "optional": true,             "name": "io.debezium.data.Json",             "version": 1,             "field": "after"          }
,
         
{             "type": "string",             "optional": true,             "name": "io.debezium.data.Json",             "version": 1,             "field": "patch"          }
,
         
{             "type": "string",             "optional": true,             "name": "io.debezium.data.Json",             "version": 1,             "field": "filter"          }
,
         {
            "type": "struct",
            "fields": [
               {
                  "type": "array",
                  "items":
{                      "type": "string",                      "optional": false                   }
,
                  "optional": true,
                  "field": "removedFields"
               },
               
{                   "type": "string",                   "optional": true,                   "name": "io.debezium.data.Json",                   "version": 1,                   "field": "updatedFields"                }
,
               {
                  "type": "array",
                  "items": {
                     "type": "struct",
                     "fields": [
                       
{                            "type": "string",                            "optional": false,                            "field": "field"                         }
,
                       
{                            "type": "int32",                            "optional": false,                            "field": "size"                         }
                     ],
                     "optional": false,
                     "name": "io.debezium.connector.mongodb.changestream.truncatedarray",
                     "version": 1
                  },
                  "optional": true,
                  "field": "truncatedArrays"
               }
            ],
            "optional": true,
            "name": "io.debezium.connector.mongodb.changestream.updatedescription",
            "version": 1,
            "field": "updateDescription"
         },
         {
            "type": "struct",
            "fields": [
               
{                   "type": "string",                   "optional": false,                   "field": "version"                }
,
               
{                   "type": "string",                   "optional": false,                   "field": "connector"                }
,
               
{                   "type": "string",                   "optional": false,                   "field": "name"                }
,
               
{                   "type": "int64",                   "optional": false,                   "field": "ts_ms"                }
,
               {
                  "type": "string",
                  "optional": true,
                  "name": "io.debezium.data.Enum",
                  "version": 1,
                  "parameters":
{                      "allowed": "true,last,false,incremental"                   }
,
                  "default": "false",
                  "field": "snapshot"
               },
               
{                   "type": "string",                   "optional": false,                   "field": "db"                }
,
               
{                   "type": "string",                   "optional": true,                   "field": "sequence"                }
,
               
{                   "type": "string",                   "optional": false,                   "field": "rs"                }
,
               
{                   "type": "string",                   "optional": false,                   "field": "collection"                }
,
               
{                   "type": "int32",                   "optional": false,                   "field": "ord"                }
,
               
{                   "type": "string",                   "optional": true,                   "field": "lsid"                }
,
               
{                   "type": "int64",                   "optional": true,                   "field": "txnNumber"                }
            ],
            "optional": false,
            "name": "io.debezium.connector.mongo.Source",
            "field": "source"
         },
         
{             "type": "string",             "optional": true,             "field": "op"          }
,
         
{             "type": "int64",             "optional": true,             "field": "ts_ms"          }
,
         {
            "type": "struct",
            "fields": [
               
{                   "type": "string",                   "optional": false,                   "field": "id"                }
,
               
{                   "type": "int64",                   "optional": false,                   "field": "total_order"                }
,
               
{                   "type": "int64",                   "optional": false,                   "field": "data_collection_order"                }
            ],
            "optional": true,
            "name": "event.block",
            "version": 1,
            "field": "transaction"
         }
      ],
      "optional": false,
      "name": "src.metrics.customers.Envelope"
   },
   "payload": {
      "before": null,
      "after": "{\"_id\":
{\"$numberLong\": \"1001\"}
,\"first_name\": \"Sallyddf\",\"last_name\": \"Thomas\",\"email\": \"sally.thomas@acme.com\"}",
      "patch": null,
      "filter": null,
      "updateDescription":
{          "removedFields": null,          "updatedFields": "\{\"first_name\": \"Sallyddf\"}
",
         "truncatedArrays": null
      },
      "source":
{          "version": "2.0.0.Final",          "connector": "mongodb",          "name": "src",          "ts_ms": 1669244642000,          "snapshot": "false",          "db": "metrics",          "sequence": null,          "rs": "rs0",          "collection": "customers",          "ord": 2,          "lsid": null,          "txnNumber": null       }
,
      "op": "u",
      "ts_ms": 1669244642381,
      "transaction": null
   }
}

 

 

{}Sink Connector Exception:{}

 

ERROR Unable to process record SinkRecord{kafkaOffset=4, timestampType=CreateTime} ConnectRecord{topic='src.metrics.customers', kafkaPartition=0, key=
{id=1001}
, keySchema=null, value=Struct{after={"_id":
{"$numberLong": "1001"}
,"first_name": "Sallyddf","last_name": "Thomas","email": "sally.thomas@acme.com"},updateDescription=Struct{updatedFields={"first_name": "Sallyddf"}},source=Struct{version=2.0.0.Final,connector=mongodb,name=src,ts_ms=1669244642000,snapshot=false,db=metrics,rs=rs0,collection=customers,ord=2},op=u,ts_ms=1669244642381}, valueSchema=Schema{src.metrics.customers.Envelope:STRUCT}, timestamp=1669244642856, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
metrics-sink-connect    | org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at java.base/java.util.Optional.flatMap(Optional.java:294)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
metrics-sink-connect    |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect    |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect    |       at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect    | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
metrics-sink-connect    |       at org.bson.BsonValue.asString(BsonValue.java:69)
metrics-sink-connect    |       at org.bson.BsonDocument.getString(BsonDocument.java:252)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
metrics-sink-connect    |       ... 22 more
metrics-sink-connect    | [2022-11-23 23:04:02,876] ERROR WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Value expected to be of type STRING is of unexpected type NULL (org.apache.kafka.connect.runtime.WorkerSinkTask)
metrics-sink-connect    | org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at java.base/java.util.Optional.flatMap(Optional.java:294)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
metrics-sink-connect    |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect    |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect    |       at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect    | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
metrics-sink-connect    |       at org.bson.BsonValue.asString(BsonValue.java:69)
metrics-sink-connect    |       at org.bson.BsonDocument.getString(BsonDocument.java:252)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
metrics-sink-connect    |       ... 22 more
metrics-sink-connect    | [2022-11-23 23:04:02,878] ERROR WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
metrics-sink-connect    | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
metrics-sink-connect    |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect    |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect    |       at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect    | Caused by: org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at java.base/java.util.Optional.flatMap(Optional.java:294)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect    |       ... 10 more
metrics-sink-connect    | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
metrics-sink-connect    |       at org.bson.BsonValue.asString(BsonValue.java:69)
metrics-sink-connect    |       at org.bson.BsonDocument.getString(BsonDocument.java:252)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
metrics-sink-connect    |       ... 22 more

 

 

I followed all the examples and documentation from Debezium and MondoDb Sink Connector and I still have no clue why this is happening.

Please find below the dockerfiles and my configurations:

{}Debezium Sink Connector Dockefile{}

 

FROM quay.io/debezium/connect:2.0
ENV KAFKA_CONNECT_MONGODB_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-mongodb
USER root
RUN microdnf -y install git maven java-11-openjdk-devel && microdnf clean all
USER kafka
 
 
RUN mkdir -p $KAFKA_CONNECT_MONGODB_DIR && cd $KAFKA_CONNECT_MONGODB_DIR && \
  git clone https://github.com/hpgrahsl/kafka-connect-mongodb.git && \
  cd kafka-connect-mongodb && \
  git fetch --tags && \
  git checkout tags/v1.2.0 && \
  mvn clean package -DskipTests=true -DskipITs=true && \
  mv target/kafka-connect-mongodb/kafka-connect-mongodb-1.2.0-jar-with-dependencies.jar $KAFKA_CONNECT_MONGODB_DIR && \
  cd .. && rm -rf $KAFKA_CONNECT_MONGODB_DIR/kafka-connect-mongodb 

Please find below the dockerfiles and my configurations:

{}MongoDB Kafka Sink Connector Dockerfile{}

FROM confluentinc/cp-kafka-connect:7.2.2
RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

{}Debezium Source Connector Configuration{}

 

{
  "name": "metrics",
  "config":
{     "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",     "mongodb.name": "metrics-src",     "mongodb.user": "admin",     "mongodb.password": "admin",     "mongodb.authsource": "admin",     "mongodb.hosts": "rs0/metrics-src:27017",     "topic.prefix": "src",     "database.include.list": "metrics"   }
}

{}MongoDb Sink Configuration with CDC Handler{}

{
  "name": "metrics",
  "config":
{     "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",     "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",     "connection.uri": "mongodb://metrics-sink:27017/metrics",     "database": "metrics",     "collection": "metrics",     "topics": "src.metrics.customers"   }
} 

{}Docker Compose File{}

 

version: '3.4'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    restart: always
    networks:
        - sync-network
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOO_4LW_COMMANDS_WHITELIST: "*"
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=ruok"
    healthcheck:
      test: nc -z localhost 2181 || exit -1
      interval: 10s
      timeout: 5s
      retries: 3
      start_period: 10s
    extra_hosts:
      - "moby:127.0.0.1"
  broker:
    image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    restart: always
    networks:
        - sync-network
    ports:
      - "9092:9092"
      - "39092:39092"
    depends_on:
        zookeeper:
          condition: service_healthy
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://broker:19092,EXTERNAL_LISTENER://0.0.0.0:39092
      KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://localhost:19092,EXTERNAL_LISTENER://150.230.85.73:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,HOST_LISTENER:PLAINTEXT,EXTERNAL_LISTENER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    extra_hosts:
      - "moby:127.0.0.1"
    healthcheck:
      test: echo "ruok" | timeout 2 nc -w 2 zookeeper 2181 | grep imok
      interval: 10s
      timeout: 5s
      retries: 3
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    # network_mode: host
    ports:
      - 9000:9000
    networks:
      - sync-network
    depends_on:
        broker:
          condition: service_healthy
    environment:
      KAFKA_BROKERCONNECT: broker:9092
  metrics-src:
    image: mongo:5.0.5
    hostname: metrics-src
    restart: always
    container_name: metrics-src
    ports:
      - 27040:27017
    networks:
      - sync-network
    environment:
      MONGO_INITDB_DATABASE: metrics
    volumes:
      - ./scripts:/scripts
    healthcheck:
      test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
      interval: 10s
      start_period: 30s
    command: --replSet rs0 --bind_ip_all
  metrics-sink:
    image: mongo:5.0.5
    hostname: metrics-sink
    restart: always
    container_name: metrics-sink
    ports:
      - 27020:27017
    networks:
      - sync-network
    environment:
      MONGO_INITDB_DATABASE: metrics
    volumes:
      - ./scripts:/scripts
    healthcheck:
      test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
      interval: 10s
      start_period: 30s
    command: --replSet rs0 --bind_ip_all
  metrics-src-connect:
    image: quay.io/debezium/connect:2.0
    container_name: metrics-connect
    ports:
     - 8083:8083
    links:
     - broker
     - metrics-src
    networks:
      - sync-network
    volumes:
     - kafka-src-config:/kafka/config
    environment:
     - BOOTSTRAP_SERVERS=broker:9092
     - REST_HOST_NAME=0.0.0.0
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=metrics_src_connect_configs
     - OFFSET_STORAGE_TOPIC=metrics_src_connect_offsets
     - STATUS_STORAGE_TOPIC=metrics_src_connect_status
     - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
     - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true
     - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
  # container with mongo kafka plugins
  metrics-sink-connect:
    image: confluentinc/cp-kafka-connect-base:7.2.2
    build:
      context: ./mongodb-kafka-connect
    ports:
      - "8084:8083"
    hostname: metrics-sink-connect
    container_name: metrics-sink-connect
    depends_on:
      - zookeeper
      - broker
    networks:
      - sync-network
    volumes:
     - kafka-sink-config:/kafka/config
    environment:
      KAFKA_JMX_PORT: 35000
      KAFKA_JMX_HOSTNAME: localhost
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: metrics-sink-connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster-group
      CONNECT_CONFIG_STORAGE_TOPIC: metrics_sink_connect_configs
      CONNECT_OFFSET_STORAGE_TOPIC: metrics_sink_connect_offsets
      CONNECT_STATUS_STORAGE_TOPIC: metrics_sink_connect_status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_METADATA_MAX_AGE_MS: 180000
      CONNECT_CONNECTIONS_MAX_IDLE_MS: 180000
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_AUTO_CREATE_TOPICS_ENABLE: "true"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
networks:
  sync-network:
    driver: bridge
volumes:
  kafka-sink-config:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: ./kafka/sink-config
  kafka-src-config:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: ./kafka/src-config

Comment by Ross Lawley [ 24/Mar/22 ]

Note: MongoDB 5.0 has a different format from the oplog. The CDC handler needs to be updated to handle the new diff field.

Generated at Thu Feb 08 09:06:03 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.