-
Type:
Bug
-
Resolution: Fixed
-
Priority:
Critical - P2
-
Affects Version/s: 1.0
-
Component/s: None
-
None
-
(copied to CRM)
-
None
-
None
-
None
-
None
-
None
-
None
SourceConnector failing after dropDatabase
Using single instance MongoDB Source Connector, running 1 task. Using a few different configurations, I can get the connector to fail after performing dropDatabase...seems like something is up when handling an invalidate?
SCENARIO 1: Watching a database with a topic prefix. Here is the configuration:
{ "name": "sourcetest", "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://USER:PASS@192.168.74.101:27001/?authSource=admin&replicaSet=repl-example&ssl=false", "database": "test", "topic.prefix": "topicprefix" }
1. create database "test"
2. create collections testone, testtwo, testthree
3. insert documents into the collections, you'll see insert events show up on topics topicprefix.test.testone|testtwo|testthree
4. dropDatabase. You'll see drop events in topicprefix.test.testone|testtwo|testthree and a dropDatabase in topicprefix.test and an invalidate in "topicprefix."
5. Repeat steps 1-4 above. You will get many duplicate drop events and dropDatabase events in each topic, and duplicate invalidates in the "topicprefix." Pasting that topic output below.
6. Connector goes into degraded state, then fails. Here is stack trace from the connect.log
[2020-02-14 14:21:33,535] INFO WorkerSourceTask{id=sourceyetagain-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2020-02-14 14:21:33,538] INFO Resuming the change stream at the previous offset (com.mongodb.kafka.connect.source.MongoSourceTask:231) [2020-02-14 14:21:33,539] INFO WorkerSourceTask{id=sourceyetagain-0} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515) [2020-02-14 14:21:33,540] INFO WorkerSourceTask{id=sourceyetagain-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2020-02-14 14:21:33,540] INFO WorkerSourceTask{id=sourceyetagain-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2020-02-14 14:21:33,540] ERROR WorkerSourceTask{id=sourceyetagain-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) com.mongodb.MongoCommandException: Command failed with error 260 (InvalidResumeToken): 'Attempting to resume a change stream using 'resumeAfter' is not allowed from an invalidate notification.' on server 192.168.74.101:27001. The full response is {"operationTime": {"$timestamp": {"t": 1581690087, "i": 4}}, "ok": 0.0, "errmsg": "Attempting to resume a change stream using 'resumeAfter' is not allowed from an invalidate notification.", "code": 260, "codeName": "InvalidResumeToken", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1581690087, "i": 4}}, "signature": {"hash": {"$binary": "ypvQF5zIIi6nfN2SZ8+WTpATP6I=", "$type": "00"}, "keyId": {"$numberLong": "6782908947104792578"}}}} at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:175) at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:303) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259) at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:450) at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:226) at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123) at com.mongodb.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:343) at com.mongodb.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:334) at com.mongodb.operation.CommandOperationHelper.executeCommandWithConnection(CommandOperationHelper.java:220) at com.mongodb.operation.CommandOperationHelper$5.call(CommandOperationHelper.java:206) at com.mongodb.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:463) at com.mongodb.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:203) at com.mongodb.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:200) at com.mongodb.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:340) at com.mongodb.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:336) at com.mongodb.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:463) at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:336) at com.mongodb.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:61) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:196) at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:203) at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:53) at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:128) at com.mongodb.client.internal.ChangeStreamIterableImpl$1.iterator(ChangeStreamIterableImpl.java:123) at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:236) at com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:316) at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:154) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-02-14 14:21:33,541] ERROR WorkerSourceTask{id=sourceyetagain-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180) [2020-02-14 14:21:33,544] INFO Closed connection [connectionId{localValue:46, serverValue:3344}] to 192.168.74.101:27001 because the pool has been closed. (org.mongodb.driver.connection:71)
Topic output:
[root@c7402 bin]# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topicprefix2. OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N "{\"_id\": {\"_data\": \"825E46A8AF000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581689007, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}" "{\"_id\": {\"_data\": \"825E46ACE7000000042B022C0100296F04\"}, \"operationType\": \"invalidate\", \"clusterTime\": {\"$timestamp\": {\"t\": 1581690087, \"i\": 4}}}"
SCENARIO 2: Watching a database. Here is the configuration:
{ "name": "sourcetest", "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://USER:PASS@192.168.74.101:27001/?authSource=admin&replicaSet=repl-example&ssl=false", "database": "test" }
1. create database "test"
2. create collections testone, testtwo, testthree
3. insert documents into the collections, you'll see insert events show up on topics topicprefix.test.testone|testtwo|testthree
4. dropDatabase. You'll see drop events in test.testone|testtwo|testthree and a dropDatabase in test.
5. Connector goes into degraded state, then fails. Here is stack trace from the connect.log
[2020-02-17 19:53:51,865] ERROR [Producer clientId=connector-producer-testsource-0] Metadata response reported invalid topics [] (org.apache.kafka.clients.Metadata:276) [2020-02-17 19:53:51,866] ERROR WorkerSourceTask{id=testsource-0} failed to send record to : (org.apache.kafka.connect.runtime.WorkerSourceTask:347) org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [] [2020-02-17 19:53:51,897] INFO WorkerSourceTask{id=testsource-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2020-02-17 19:53:51,897] INFO WorkerSourceTask{id=testsource-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2020-02-17 19:53:56,898] ERROR WorkerSourceTask{id=testsource-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:438) [2020-02-17 19:53:56,898] ERROR WorkerSourceTask{id=testsource-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:258) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:227) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [] [2020-02-17 19:53:56,899] ERROR WorkerSourceTask{id=testsource-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180) [2020-02-17 19:53:56,899] INFO Closed connection [connectionId{localValue:9, serverValue:3379}] to 192.168.74.101:27001 because the pool has been closed. (org.mongodb.driver.connection:71) [2020-02-17 19:53:56,900] INFO [Producer clientId=connector-producer-testsource-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183) [2020-02-17 19:53:56,905] INFO Publish thread interrupted for client_id=connector-producer-testsource-0 client_type=PRODUCER session= cluster=iRS7tJyDT6CRf2t6IdewJg (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:285) [2020-02-17 19:53:56,906] INFO Publishing Monitoring Metrics stopped for client_id=connector-producer-testsource-0 client_type=PRODUCER session= cluster=iRS7tJyDT6CRf2t6IdewJg (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:297) [2020-02-17 19:53:56,907] INFO [Producer clientId=confluent.monitoring.interceptor.connector-producer-testsource-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183) [2020-02-17 19:53:56,916] INFO Closed monitoring interceptor for client_id=connector-producer-testsource-0 client_type=PRODUCER session= cluster=iRS7tJyDT6CRf2t6IdewJg (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:320) [2020-02-17 19:54:09,717] INFO WorkerSourceTask{id=testsource-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2020-02-17 19:54:09,718] INFO WorkerSourceTask{id=testsource-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2020-02-17 19:54:14,717] ERROR WorkerSourceTask{id=testsource-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:438) [2020-02-17 19:54:14,718] ERROR WorkerSourceTask{id=testsource-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)
- is duplicated by
-
KAFKA-89 Kafka source connector swallows change stream failures
-
- Closed
-