[KAFKA-325] Source Connector Stays Running When Falling Off Oplog Created: 27/Jul/22  Updated: 27/Oct/23  Resolved: 28/Sep/22

Status: Closed
Project: Kafka Connector
Component/s: Source
Affects Version/s: 1.7.0
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Errol Kutan Assignee: Maxim Katcharov
Resolution: Gone away Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

In scenarios where the kafka connector falls off the oplog, it throws an exception indicating that it cannot resume the underlying change stream. However, when mongo.errors.tolerance is set to "none", we would expect this to cause the connector to enter a FAILED state, which may help draw attention to admins/operators that manual intervention is required. 

However, it appears that the connector continues to be in a RUNNING state, and users will not be aware until symptoms begin to manifest in that system/data pipeline.  

Example:

Connector config:1

{
    "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri":"mongodb+srv://abor-development-20211025:<EMAIL redactopus@example.com>/?readPreference=secondary",
    "database":"development",
    "collection":"ingestion",
    "publish.full.document.only":"true",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable":"false",
    "batch.size":"0",
    "offset.partition.name":"abor-ingestion-1",
    "heartbeat.interval.ms":"60000",
    "poll.await.time.ms":"5000",
    "poll.max.batch.size":"1000",
    "tasks.max":"1",
    "pipeline":"[{\"$match\": { \"$and\": [ {\"fullDocument.pk_credit\":{$ne:null}}, {\"fullDocument.pk_debit\":{$ne:null}} ] } }]",
    "topic.namespace.map":"{ \"development.ingestion\": \"cdc.abor.ingestion\" }",
    "topic.prefix":"",
    "topic.suffix":"",
    "errors.log.enable":"true",
    "name":"abor-mongo-ingestion-source"
} 

1.  Connector falls off the oplog as indicated by change stream resume failure in mongod log below:

{
    "t": {
        "$date": "2022-07-20T08:52:41.428+00:00"
    },
    "s": "I",
    "c": "COMMAND",
    "id": 51803,
    "ctx": "conn302681",
    "msg": "Slow query",
    "attr": {
        "type": "command",
        "ns": "development.ingestion",
        "command": {
            "getMore": 7867414900284144865,
            "collection": "ingestion",
            "maxTimeMS": 1000,
            "lsid": {
                "id": {
                    "$uuid": "6cc7d463-0d99-465e-8c08-cf4609766c62"
                },
                "uid": {
                    "$binary": {
                        "base64": "wqELVKpJotbJlP9+PF/vEE7da06fWqqQdviC6wP43Ts=",
                        "subType": "0"
                    }
                }
            },
            "$clusterTime": {
                "clusterTime": {
                    "$timestamp": {
                        "t": 1658307161,
                        "i": 3362
                    }
                },
                "signature": {
                    "hash": {
                        "$binary": {
                            "base64": "LRNoxwZcxRxqwYQuLJ3LFE2sP+0=",
                            "subType": "0"
                        }
                    },
                    "keyId": 7059853149929996291
                }
            },
            "$audit": {
                "$impersonatedUsers": [
                    {
                        "user": "abor-development-20211025",
                        "db": "admin"
                    }
                ],
                "$impersonatedRoles": [
                    {
                        "role": "readWrite",
                        "db": "development"
                    }
                ]
            },
            "$client": {
                "driver": {
                    "name": "mongo-java-driver|sync|mongo-kafka|source",
                    "version": "4.5.0|1.7.0"
                },
                "os": {
                    "type": "Linux",
                    "name": "Linux",
                    "architecture": "amd64",
                    "version": "5.4.94"
                },
                "platform": "Java/Private Build/11.0.15+10-Ubuntu-0ubuntu0.20.04.1",
                "mongos": {
                    "host": "atlas-1q1s8m-shard-00-02.wzyq3.mongodb.net:27016",
                    "client": "192.168.248.107:37164",
                    "version": "4.4.15"
                }
            },
            "$configServerState": {
                "opTime": {
                    "ts": {
                        "$timestamp": {
                            "t": 1658307161,
                            "i": 496
                        }
                    },
                    "t": 20
                }
            },
            "$db": "development"
        },
        "originatingCommand": {
            "aggregate": "ingestion",
            "pipeline": [
                {
                    "$changeStream": {
                        "fullDocument": "updateLookup",
                        "startAfter": {
                            "_data": "8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004"
                        }
                    }
                }
            ],
            "fromMongos": true,
            "needsMerge": true,
            "collation": {
                "locale": "simple"
            },
            "cursor": {
                "batchSize": 0
            },
            "runtimeConstants": {
                "localNow": {
                    "$date": "2022-07-20T08:52:41.391Z"
                },
                "clusterTime": {
                    "$timestamp": {
                        "t": 1658307161,
                        "i": 3247
                    }
                }
            },
            "use44SortKeys": true,
            "useNewUpsert": true,
            "readConcern": {
                "provenance": "implicitDefault"
            },
            "writeConcern": {
                "w": 1,
                "wtimeout": 0,
                "provenance": "implicitDefault"
            },
            "clientOperationKey": {
                "$uuid": "e2974d33-cdac-44ae-bb3d-8ab6e4bb618c"
            },
            "lsid": {
                "id": {
                    "$uuid": "6cc7d463-0d99-465e-8c08-cf4609766c62"
                },
                "uid": {
                    "$binary": {
                        "base64": "wqELVKpJotbJlP9+PF/vEE7da06fWqqQdviC6wP43Ts=",
                        "subType": "0"
                    }
                }
            },
            "$readPreference": {
                "mode": "secondary"
            },
            "$clusterTime": {
                "clusterTime": {
                    "$timestamp": {
                        "t": 1658307161,
                        "i": 3247
                    }
                },
                "signature": {
                    "hash": {
                        "$binary": {
                            "base64": "LRNoxwZcxRxqwYQuLJ3LFE2sP+0=",
                            "subType": "0"
                        }
                    },
                    "keyId": 7059853149929996291
                }
            },
            "$audit": {
                "$impersonatedUsers": [
                    {
                        "user": "abor-development-20211025",
                        "db": "admin"
                    }
                ],
                "$impersonatedRoles": [
                    {
                        "role": "readWrite",
                        "db": "development"
                    }
                ]
            },
            "$client": {
                "driver": {
                    "name": "mongo-java-driver|sync|mongo-kafka|source",
                    "version": "4.5.0|1.7.0"
                },
                "os": {
                    "type": "Linux",
                    "name": "Linux",
                    "architecture": "amd64",
                    "version": "5.4.94"
                },
                "platform": "Java/Private Build/11.0.15+10-Ubuntu-0ubuntu0.20.04.1",
                "mongos": {
                    "host": "atlas-1q1s8m-shard-00-02.wzyq3.mongodb.net:27016",
                    "client": "192.168.248.107:37164",
                    "version": "4.4.15"
                }
            },
            "$configServerState": {
                "opTime": {
                    "ts": {
                        "$timestamp": {
                            "t": 1658307161,
                            "i": 496
                        }
                    },
                    "t": 20
                }
            },
            "$db": "development"
        },
        "planSummary": "COLLSCAN",
        "cursorid": 7867414900284144865,
        "numYields": 0,
        "ok": 0,
        "errMsg": "Resume of change stream was not possible, as the resume point may no longer be in the oplog.",
        "errName": "ChangeStreamHistoryLost",
        "errCode": 286,
        "reslen": 521,
        "locks": {
            "FeatureCompatibilityVersion": {
                "acquireCount": {
                    "r": 1
                }
            },
            "ReplicationStateTransition": {
                "acquireCount": {
                    "w": 1
                }
            },
            "Global": {
                "acquireCount": {
                    "r": 1
                }
            },
            "Database": {
                "acquireCount": {
                    "r": 1
                }
            },
            "Mutex": {
                "acquireCount": {
                    "r": 1
                }
            },
            "oplog": {
                "acquireCount": {
                    "r": 1
                }
            }
        },
        "readConcern": {
            "level": "majority"
        },
        "writeConcern": {
            "w": 1,
            "wtimeout": 0,
            "provenance": "implicitDefault"
        },
        "storage": {},
        "protocol": "op_msg",
        "durationMillis": 33
    }
} 

See specifically 

"errMsg": "Resume of change stream was not possible, as the resume point may no longer be in the oplog.",        "errName": "ChangeStreamHistoryLost", 

2. Then we see exceptions in kafka connector, yet it continues to run and send heartbeats:

 

"2022-07-20T04:34:10.531Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,531] INFO [abor-mongo-ingestion-source|task-0] An exception occurred when trying to get the next item from the Change Stream (com.mongodb.kafka.connect.source.MongoSourceTask:648)"
"2022-07-20T04:34:10.523Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,523] INFO [abor-mongo-ingestion-source|task-0] Resuming the change stream after the previous offset: {""_data"": ""8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004""} (com.mongodb.kafka.connect.source.MongoSourceTask:430)"
"2022-07-20T04:34:10.522Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,522] INFO [abor-mongo-ingestion-source|task-0] Watching for collection changes on 'development.ingestion' (com.mongodb.kafka.connect.source.MongoSourceTask:696)"
"2022-07-20T04:34:10.522Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,522] INFO [abor-mongo-ingestion-source|task-0] Resume token from heartbeat: {""_data"": ""8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004""} (com.mongodb.kafka.connect.source.MongoSourceTask:735)"
"2022-07-20T04:34:10.519Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,518] INFO [abor-mongo-ingestion-source|task-0] Generating heartbeat event. {""_data"": ""8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004""} (com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager:77)"
"2022-07-20T04:34:10.518Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,518] INFO [abor-mongo-ingestion-source|task-0] An exception occurred when trying to get the next item from the Change Stream (com.mongodb.kafka.connect.source.MongoSourceTask:648)"
"2022-07-20T04:34:10.510Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,510] INFO [abor-mongo-ingestion-source|task-0] Resuming the change stream after the previous offset: {""_data"": ""8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004""} (com.mongodb.kafka.connect.source.MongoSourceTask:430)" 



 Comments   
Comment by PM Bot [ 28/Sep/22 ]

There hasn't been any recent activity on this ticket, so we're resolving it. Thanks for reaching out! Please feel free to comment on this if you're able to provide more information.

Comment by Maxim Katcharov [ 13/Sep/22 ]

I am not sure what the next step should be, so I wanted to make sure I properly understood what was going on.

The description mentions that after throwing the exception, the task continued to emit logs ("Resume token from heartbeat..."). However, when I tried to reproduce, once the task threw an exception, the poll method in the task was not called again. The only path to those log messages is through the poll method of the task, which is no longer being called. Are you really still seeing those logs after confirming that the exception is thrown?

What exactly do you mean by "entering a FAILED state"? Is this observed in the task, or the connector? How are you verifying that it is or is not in a failed state?

The page at https://docs.confluent.io/5.5.0/connect/managing/monitoring.html states: “FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output)”, which suggests that the way we put a task into a failed state is by raising an exception. For me, this seems to be successful, because the task is no longer executed.

Comment by Errol Kutan [ 30/Aug/22 ]

Hi maxim.katcharov@mongodb.com , the issue is not that the connector doesn't throw an exception – it does. The issue is that the connector stays in a RUNNING state when it should enter a FAILED state. The connector needs to be manually reset and the user has no clue until they experience downstream symptoms and go through the logs. 

Comment by Maxim Katcharov [ 16/Aug/22 ]

errol.kutan@mongodb.com I could not reproduce this. I used IntelliJ to debug a throttled source task, with default settings. Once the connector falls off the oplog, a "ConnectException: ResumeToken not found. Cannot create a change stream cursor" is thrown by the task's poll method, after which the poll method is no longer invoked, and heartbeats are no longer sent.

I am not sure why your task is not throwing an exception. To determine the difference, it may help if as a next step you were to attempt the same process - that is, create a minimal reproducer, and confirm whether you are still experiencing the problem.

Generated at Thu Feb 08 09:06:07 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.