|
Deletions on the source mongodb database creates the appropriate tombstone message with an _id.
But the document deletion does not happen in the sink database even after setting "delete.on.null.values": "true".
The connector version mongodb/kafka-connect-mongodb:1.9.0 **
Configurations
|
"Source Configs"
|
{
|
"name": "source_config_delete_v1",
|
"config": {
|
"name": "source_config_delete_v1",
|
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
|
"tasks.max": "1",
|
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
|
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
|
"connection.uri": "mongodb://localhost:27017",
|
"database": "j_trials",
|
"collection": "kafka_connect",
|
"pipeline": "[\{'$match': {'operationType': 'delete'}}, \{'$project': {'_id': 1,'fullDocument': 1,'ns': 1,} } ]",
|
"publish.full.document.only": "true",
|
"publish.full.document.only.tombstone.on.delete": "true",
|
"topic.prefix": "delete_v1",
|
"topic.namespace.map": "\{\"*\":\"j_trials.kafka_connect\"}",
|
"copy.existing": "true",
|
"mongo.errors.log.enable": "true"
|
}
|
}
|
|
"Sink config"
|
{
|
"name": "sink_config_delete_new_v1",
|
"config": {
|
"key.converter.schemas.enable": "false",
|
"value.converter.schemas.enable": "false",
|
"name": "sink_config_delete_new_v1",
|
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
|
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
|
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
|
"topics": "delete_v1.jpmc_trials.kafka_connect",
|
"connection.uri": "mongodb://localhost:27017",
|
"database": "j_trials_sink",
|
"collection": "kafka_connect",
|
"delete.on.null.values": "true",
|
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy",
|
"document.id.strategy.partial.key.projection.type": "allowlist",
|
"document.id.strategy.partial.key.projection.list": "_id"
|
}
|
}
|
|