[KAFKA-353] Sink Connector for Handling Tombstone messages Created: 14/Feb/23  Updated: 03/Apr/23  Resolved: 03/Apr/23

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Rajesh Vinayagam Assignee: Unassigned
Resolution: Duplicate Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Duplicate
duplicates KAFKA-360 tombstones incompatible downstream in... Closed
Related
is related to KAFKA-360 tombstones incompatible downstream in... Closed

 Description   

Deletions on the source mongodb database creates the appropriate tombstone message with an _id.

But the document deletion does not happen in the sink database even after setting "delete.on.null.values": "true".

The connector version mongodb/kafka-connect-mongodb:1.9.0 ** 

Configurations

"Source Configs"

{
  "name""source_config_delete_v1",
  "config": {
    "name""source_config_delete_v1",
    "connector.class""com.mongodb.kafka.connect.MongoSourceConnector",
    "tasks.max""1",
    "key.converter""org.apache.kafka.connect.storage.StringConverter",
    "value.converter""org.apache.kafka.connect.storage.StringConverter",
    "connection.uri""mongodb://localhost:27017",
    "database""j_trials",
    "collection""kafka_connect",
    "pipeline""[\{'$match': {'operationType': 'delete'}}, \{'$project': {'_id': 1,'fullDocument': 1,'ns': 1,} } ]",
    "publish.full.document.only""true",
    "publish.full.document.only.tombstone.on.delete""true",
    "topic.prefix""delete_v1",
    "topic.namespace.map""\{\"*\":\"j_trials.kafka_connect\"}",
    "copy.existing""true",
    "mongo.errors.log.enable""true"
  }
}

 

"Sink config"

{
  "name""sink_config_delete_new_v1",
  "config": {
    "key.converter.schemas.enable""false",
    "value.converter.schemas.enable""false",
    "name""sink_config_delete_new_v1",
    "connector.class""com.mongodb.kafka.connect.MongoSinkConnector",
    "key.converter""org.apache.kafka.connect.storage.StringConverter",
    "value.converter""org.apache.kafka.connect.storage.StringConverter",
    "topics""delete_v1.jpmc_trials.kafka_connect",
    "connection.uri""mongodb://localhost:27017",
    "database""j_trials_sink",
    "collection""kafka_connect",
    "delete.on.null.values""true",
    "document.id.strategy""com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy",
    "document.id.strategy.partial.key.projection.type""allowlist",
    "document.id.strategy.partial.key.projection.list""_id"
  }
}

 


Generated at Thu Feb 08 09:06:11 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.