[KAFKA-311] Failed to resume change stream - Bad resume token Created: 21/Apr/22  Updated: 27/Oct/23  Resolved: 26/Apr/22

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Task Priority: Major - P3
Reporter: Abdul Basith Assignee: Ross Lawley
Resolution: Works as Designed Votes: 0
Labels: external-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Backports
backported by KAFKA-111 Failed to resume change stream: Bad r... Closed

 Description   

Hi. I am deploying latest version 0.28.0 of strimzi kafka operator helm chart in my Kubernetes cluster (https://artifacthub.io/packages/helm/strimzi/strimzi-kafka-operator) with mongodb sink connector to stream data from Kafka topic to MongoDb database.

I get the same error when new data is added to Kafka topic. I get this error in the logs:

 

2022-04-21 12:23:20,614 INFO [mongodb-source-connector|task-0] Watching for collection changes on 'weather.data' (com.mongodb.kafka.connect.source.MongoSourceTask) [task-thread-mongodb-source-connector-0]
2022-04-21 12:23:20,615 INFO [mongodb-source-connector|task-0] Resuming the change stream after the previous offset using resumeAfter (com.mongodb.kafka.connect.source.MongoSourceTask) [task-thread-mongodb-source-connector-0]
2022-04-21 12:23:20,617 INFO [mongodb-source-connector|task-0] Failed to resume change stream: Bad resume token: _data of missing or of wrong type{_id: 5553a998e4b02cf715119f6e, copyingData: true} 40647 (com.mongodb.kafka.connect.source.MongoSourceTask) [task-thread-mongodb-source-connector-0]



 Comments   
Comment by Abdul Basith [ 28/Apr/22 ]

Thanks much Ross. Appreciate the support provided.

Comment by Ross Lawley [ 26/Apr/22 ]

Hi abdul.basith.kj@gmail.com,

The problem arises only when I use the Python API to add new data directly to the topic which could have been a rogue record causing that issue. I removed that record and I don't get that issue anymore.

Glad you were able to sort the issue. From the error message it looks like some data was projected out that was required by the source connector (possibly by the provided pipeline). There are expectations regarding the data created by the source connector. Also error handling by the sink connector can be done via a DLQ - see the errors.tolerance Kafka configuration.

Just to let you know for general usage questions our MongoDB community portal, located here might be useful as you can discuss any issues, how to's with the wider community.

All the best,

Ross Lawley

Ross

Comment by Abdul Basith [ 24/Apr/22 ]

Hi. I have found something.

 

I have both Mongo source and sink connectors enabled. Now when I add a document directly via a tool like 'Studio 3T' by using Copy document and Paste document after changing the Id, the new record gets copied to the topic through source connector and then it's copied to the sink Mongo database through sink connector - the whole workflow works excellent.

 

The problem arises only when I use the Python API to add new data directly to the topic which could have been a rogue record causing that issue. I removed that record and I don't get that issue anymore.

 

Now when I try to add a new record with the below code, the record does not get copied to the topic and I have a new error below:

 

from kafka import KafkaProducer
from kafka.errors import KafkaError

import json

 

producer = KafkaProducer(bootstrap_servers=["broker_ip:9092"], value_serializer=lambda m: json.dumps(m).encode('ascii'))

{{producer.send('kafka.weather.data', {"key":"_id", "value":"{\"_id\":

Unknown macro: {"$oid"}

, \"st\": \"x+47100-059501\", \"ts\": {\"$date\": 447400700000}, \"position\": {\"type\": \"Point\", \"coordinates\": [-59.4, 47.1]}, \"elevation\": 9998}]}"})}}

Error:

2022-04-24 22:00:37,917 ERROR Uncaught exception in REST call to /connectors/mongodb-source-connector/topics (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper) [qtp1489322191-15]
javax.ws.rs.NotFoundException: HTTP 404 Not Found
        at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:250)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
        at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
        at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)

 

I have verified that the topic 'kafka.weather.data' is available but somehow the source connector couldn't find it with that REST API call.

Comment by Abdul Basith [ 22/Apr/22 ]

For some reason, the last 'pipeline' line of the descriptor file is not showing up properly. Trying again here: 
[{"$match" : {"operationType" :

{ "$in" : ["insert","update","replace"]}

}}, {"$project":{"_id":1,"fullDocument":1,"ns":1,"documentKey":1}}]

Comment by Abdul Basith [ 22/Apr/22 ]

Thanks much @Ross Lawley (ross@mongodb.com). Just realized it's the source connector that has the issue.

 

So I am using the MongoDb source connector to connect to the Kafka topic. Here is the descriptor file:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mongodb-source-connector
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: com.mongodb.kafka.connect.MongoSourceConnector
  tasksMax: 2
  config:
    connection.uri: <<mongodb_connection_uri>>
    topic.prefix: kafka
    database: weather
    collection: data
    copy.existing: true
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    publish.full.document.only: true
    pipeline: >

            [{"$match" :

Unknown macro: { "operationType" }

}},{"$project" : {"_id" : 1,"fullDocument" : 1,"ns" : 1,"documentKey" : 1}}]

So the first time, all of the Mongodb source collections data is getting copied to the Kafka topic 'kafka.weather.data'. But the next time, when I add a new row to my Mongodb source collection, I get the above error.
 
Please let me know if you need more information. 

Comment by Ross Lawley [ 22/Apr/22 ]

Hi abdul.basith.kj@gmail.com,

The error message is coming from the Source connector and not the sink connector. Can you provide more information on how you are using the source connector?

Ross

Generated at Thu Feb 08 09:06:05 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.