[KAFKA-111] Failed to resume change stream: Bad resume token: _data of missing or of wrong type Created: 03/Jun/20  Updated: 28/Oct/23  Resolved: 23/Jun/20

Status: Closed
Project: Kafka Connector
Component/s: Source
Affects Version/s: 1.1
Fix Version/s: 1.2.0

Type: Bug Priority: Critical - P2
Reporter: Rajaramesh Yaramati Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

Kafka version 2.4.0

Source MongoDB 3.6.8
Target MongoDB 3.6.8
Source connector MongoDBSourceConnector version 1.1
Sink connector MongoDBSinkConnector version 1.1


Attachments: Text File bad_resume_token_error.log    
Issue Links:
Backports
backports KAFKA-311 Failed to resume change stream - Bad ... Closed

 Description   

I am testing source and sink MongoDB kafka connector and after it completes init sync and when it start reading from oplog using change streams, I get below failure and stops copying new changes from source. Please take a look. 

SourceConnector config:

curl -X POST -H "Accept:application/json" -H "Content-Type: application/json" localhost:9083/connectors/ --data '{

"name":"mongo-source-assets-shard1oplog2",

"config": {

"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",

"key.converter":"org.apache.kafka.connect.json.JsonConverter",

"key.converter.schemas.enable":"false",

"value.converter":"org.apache.kafka.connect.json.JsonConverter",

"value.converter.schemas.enable":"false",

"connection.uri":"mongodb://xxx.xxx.xxx.xxx:27017",

"database":"oz_next",

"collection":"assets",

"publish.full.document.only":"true",

"topic.prefix":"oplog.oz_mongo",

"batch.size":"5000",

"copy.existing":"true",

"copy.existing.max.threads":"3",

"copy.existing.queue.size":"64000"}

}'

sinkCConector:

curl -X POST -H "Accept:application/json" -H "Content-Type: application/json" localhost:9083/connectors/ --data '{

"name":"mongo-sink-assets-shard1oplog2",

"config":

{ "topics":"oplog.oz_mongo.oz_next.assets", "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "connection.uri":"mongodb://10.74.3.104:27017", "database":"poc_oz_next", "collection":"poc_assets", "max.num.retries":"3", "retries.defer.timeout":"5000", "session.timeout.ms":"25000"}

}'

connector log:

[2020-05-29 08:40:55,565] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Finished commitOffsets successfully in 8872 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2020-05-29 08:41:05,566] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-05-29 08:41:05,566] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} flushing 4873 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-05-29 08:41:13,881] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Finished commitOffsets successfully in 8315 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2020-05-29 08:41:23,881] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-05-29 08:41:23,881] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} flushing 4604 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-05-29 08:41:31,322] INFO Watching for collection changes on 'oz_next.assets' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
[2020-05-29 08:41:31,326] INFO Resuming the change stream after the previous offset (com.mongodb.kafka.connect.source.MongoSourceTask:234)
[2020-05-29 08:41:31,328] INFO Watching for collection changes on 'oz_next.assets' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
[2020-05-29 08:41:31,331] INFO Resuming the change stream after the previous offset using resumeAfter (com.mongodb.kafka.connect.source.MongoSourceTask:237)
[2020-05-29 08:41:31,333] INFO Failed to resume change stream: Bad resume token: _data of missing or of wrong type{_id: "52b8348a4b0b1571cbf199af87458512", copyingData: true} 40647 (com.mongodb.kafka.connect.source.MongoSourceTask:253)
[2020-05-29 08:41:32,954] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Finished commitOffsets successfully in 9073 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2020-05-29 08:41:35,328] INFO Watching for collection changes on 'oz_next.assets' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
[2020-05-29 08:41:35,331] INFO Resuming the change stream after the previous offset using resumeAfter (com.mongodb.kafka.connect.source.MongoSourceTask:237)
[2020-05-29 08:41:35,333] INFO Failed to resume change stream: Bad resume token: _data of missing or of wrong type{_id: "7ed0cc7a09af100edc4db27f968231f9", copyingData: true} 40647 (com.mongodb.kafka.connect.source.MongoSourceTask:253)
[2020-05-29 08:41:36,330] INFO Watching for collection changes on 'oz_next.assets' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
[2020-05-29 08:41:36,334] INFO Resuming the change stream after the previous offset using resumeAfter (com.mongodb.kafka.connect.source.MongoSourceTask:237)
Connection to 10.74.3.79 closed by remote host. change stream: Bad resume token: _data of missing or of wrong type{_id: "7ed0cc7a09af100edc4db27f96823Connection to 10.74.3.79 closed.com.mongodb.kafka.connect.source.MongoSourceTask:253)



 Comments   
Comment by Ross Lawley [ 22/Apr/22 ]

Apologies abdul.basith.kj@gmail.com - I see you opened KAFKA-311

Comment by Ross Lawley [ 22/Apr/22 ]

HI abdul.basith.kj@gmail.com,

Can you please open a new ticket if you haven't already resolved your issue? Please note the error message you have reported comes from the source connector and not the sink connector.

Ross

Comment by Abdul Basith [ 21/Apr/22 ]

Hi. I am deploying version 0.28.0 of strimzi kafka operator https://artifacthub.io/packages/helm/strimzi/strimzi-kafka-operator with mongodb sink  connector to stream data from Kafka topic to MongoDb database.

I get the same error when new data is added to Kafka topic. I get this error in the logs:

2022-04-21 12:23:20,614 INFO [mongodb-source-connector|task-0] Watching for collection changes on 'weather.data' (com.mongodb.kafka.connect.source.MongoSourceTask) [task-thread-mongodb-source-connector-0]
2022-04-21 12:23:20,615 INFO [mongodb-source-connector|task-0] Resuming the change stream after the previous offset using resumeAfter (com.mongodb.kafka.connect.source.MongoSourceTask) [task-thread-mongodb-source-connector-0]
2022-04-21 12:23:20,617 INFO [mongodb-source-connector|task-0] Failed to resume change stream: Bad resume token: _data of missing or of wrong type{_id: 5553a998e4b02cf715119f6e, copyingData: true} 40647 (com.mongodb.kafka.connect.source.MongoSourceTask) [task-thread-mongodb-source-connector-0]

Comment by Githook User [ 23/Jun/20 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Fix Source connector copying existing resumability

KAFKA-111
Branch: master
https://github.com/mongodb/mongo-kafka/commit/60cadc720d85ecb36e57de792fdb590c06d55fc1

Comment by Ross Lawley [ 15/Jun/20 ]

Thanks yaramati@adobe.com,

I'll review and try to reproduce the issue locally and fix.

Comment by Rajaramesh Yaramati [ 13/Jun/20 ]

Ross Lawley,

I was sure connector did not restart during copy. Just to ensure, I tried again from scratch and I still get same issue. Attached full here: bad_resume_token_error.log

The sequence of steps:

Step 1: Created new shared collection.

Step 2: Imported sample data (10000 doc) into shared collection.

Step 3: Then started source connector task using REST API as shown in the attached log.

As soon as source connector completing fetching 10000 docs, started seeing "Failed to resume change stream: Bad resume token: _data of missing or of wrong type" message in log. 

I am able reproduce this error again and again on my test server. 

Thanks,

Rajaramesh

 

 

 

Comment by Ross Lawley [ 09/Jun/20 ]

Hi yaramati@adobe.com,

It looks like there was a restart of the connector during the copying data phase. That is an error scenario, so it is expected to stop the connector. However, only more logs would help determine if that was the case.

The error messaging should be clearer, so that will be improved in a future release.

Ross

Comment by Rajaramesh Yaramati [ 08/Jun/20 ]

Can anyone please confirm if this is a known issue? 

Generated at Thu Feb 08 09:05:36 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.