[KAFKA-359] Copy existing pipeline seems to not work Created: 06/Mar/23  Updated: 27/Oct/23  Resolved: 05/Apr/23

Status: Closed
Project: Kafka Connector
Component/s: Source
Affects Version/s: 1.9.1
Fix Version/s: None

Type: Question Priority: Major - P3
Reporter: Axel CZARNIAK Assignee: Jeffrey Yemin
Resolution: Gone away Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

Hello,

I have been playing with the MongoDB Kafka connector as a source for change stream event since few weeks and everything works fine. But we want to copy our existing database using Kafka for a migration and use a pipeline to add some fields using the copy existing feature. 

Here is the configuration we use:

{        
"name": "mongodb-source",        
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",        "connection.uri": "mongodb://....",        
"topic.namespace.map": "{\"*\": \"migration\"}",        "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",        "startup.mode": "copy_existing",        "startup.modecopy.existing.allow.disk.use": false,        "startup.modecopy.existing.max.threads": 10,        "startup.mode.copy.existing.queue.size": 2500000,               "startup.mode.copy.existing.pipeline": "[{\"$addFields\":{\"documentSize\":{\"$bsonSize\":\"$fullDocument\"}}}]"      
} 

The pipeline is well compiled since the field documentSize is present but it's always null. It seems that the key $fullDocument is not recognized in this pipeline. (It works for change stream event but not for copy existing). 

If I use fullDocument instead of $fullDocument, there is an error when configuring that show the pipeline understand $fullDocument but doesn't do anything. 

Since all our change is based on the $fullDocument key, our pipeline can't work and we are stuck for our migration.

 



 Comments   
Comment by PM Bot [ 05/Apr/23 ]

There hasn't been any recent activity on this ticket, so we're resolving it. Thanks for reaching out! Please feel free to comment on this if you're able to provide more information.

Comment by PM Bot [ 21/Mar/23 ]

Hey axel.czarniak@ioterop.com, We need additional details to investigate the problem. If this is still an issue for you, please provide the requested information.

Comment by Jeffrey Yemin [ 13/Mar/23 ]

Hi there, thank you for reaching out. As this sounds like a support issue, I wanted to give you some resources to get this question answered more quickly:

  • Our MongoDB support portal, located at support.mongodb.com
  • Our MongoDB community portal, located here
  • If you are an Atlas customer, you can also: Click the in-app chat icon in the lower right corner to chat with a MongoDB Support representative OR Click Support in the left-hand navigation to view Developer Resources.

Thank you!

Comment by Axel CZARNIAK [ 06/Mar/23 ]

Here is the generated event:

{
    "_id":{
        "_id":"6405b25b1a926a4531757588",
        "copyingData":true
    },
    "operationType":"insert",
    "documentKey":{
        "_id":"6405b25b1a926a4531757588"
    },
    "fullDocument":{
        "_id":"6405b25b1a926a4531757588",
        "timestamp":"2023-03-06T09:28:58.834Z",
        "documentSize":null
    },
    "ns":{
        "db":"*",
        "coll":"*"
    }
}

It seems that in a change stream pipeline, the field are added to the change stream document and in the copy existing pipeline the field are added to the fullDocument. Is it a desired behavior ? 

If it is, how we can add this key at the root level and not on the fullDocument level ? 

PS: Sorry for the comment, but we can't edit the issue to add additional information ? 

Generated at Thu Feb 08 09:06:12 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.