[KAFKA-313] Sink Connector Configuration on sharded collection Created: 05/May/22  Updated: 27/Oct/23  Resolved: 11/Apr/23

Status: Closed
Project: Kafka Connector
Component/s: Configuration, Documentation, Sink
Affects Version/s: 1.7.0
Fix Version/s: None

Type: Question Priority: Major - P3
Reporter: Rajesh Vinayagam Assignee: Robert Walters
Resolution: Gone away Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

We are trying to use Mongo Sink Connector for Kafka to sink the messages from a Kafka topic. Currently we are using default configuration, i.e., the default write model strategy. This configuration seems to work for a normal collection but for sharded collection we are encountering an error.

Write errors: [BulkWriteError\{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}].

Was not able to find enough documentation of what needs to be provided for a sharded collection. The intent here is to work in a similar manner as the default ReplaceOneDefaultStrategy.

Please let us know the writemodel strategy that would be applicable for sharded collection.



 Comments   
Comment by PM Bot [ 11/Apr/23 ]

There hasn't been any recent activity on this ticket, so we're resolving it. Thanks for reaching out! Please feel free to comment on this if you're able to provide more information.

Comment by PM Bot [ 04/Apr/23 ]

Hey rajesh.vinayagam@mongodb.com, We need additional details to investigate the problem. If this is still an issue for you, please provide the requested information.

Comment by Jeffrey Yemin [ 27/Mar/23 ]

rajesh.vinayagam@mongodb.com should we consider this resolved then?

Comment by Rajesh Vinayagam [ 10/May/22 ]

Hi Ross.

I tried doing that as well document.id.strategy.partial.value.projection.list, with that the connector was erroring out, so removed that from the list.

Removed the connector and reinitialized with the above settings and it was able to replace the document 

 
"document.id.strategy.partial.value.projection.list": "xxx,xxx,_id",
 
Rajesh

Comment by Ross Lawley [ 10/May/22 ]

Hi rajesh.vinayagam@mongodb.com,

The issue is the ReplaceOneBusinessKeyStrategy removes the _id field if present due to the value of the identifier coming from the supplied projection list.

Can you include the _id in the document.id.strategy.partial.value.projection.list ? If not I think for sharded clusters a custom write strategy may be required to handle providing the shardkey and existing keys. If it works then an example should be added to the documentation to help users in the future.

Ross

Comment by Rajesh Vinayagam [ 09/May/22 ]

 

A quick update on the above issue, I further tried with different configuration and I was able to get it work with the below configuration                                                                                                                                                                                  

    "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
    "document.id.strategy.overwrite.existing": "true",
    "document.id.strategy.partial.value.projection.type": "allowlist",
    "document.id.strategy.partial.value.projection.list": "xxx,xxx" 

The issue I had with above configuration was the _id was not retained from the source document instead it was generating a new _id

But if the shard key matches i.e, the values mentioned in the projetion list it was able to update the existing document.

 

Is there a way to retain the _id from the source document instead of generating a new _id when inserting or updating with the above configuration.

Generated at Thu Feb 08 09:06:05 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.