[KAFKA-212] Closing cursor exception Created: 26/Mar/21  Updated: 28/Oct/23  Resolved: 29/Mar/21

Status: Closed
Project: Kafka Connector
Component/s: Source
Affects Version/s: None
Fix Version/s: 1.5.0

Type: Bug Priority: Major - P3
Reporter: Andrey B Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
related to JAVA-4289 Change stream cursors should be resil... Closed

 Description   

Hi
I have a connector that connects to mongo replicaset with readPreference=secondary.
One of the secondary mongo was killed and the connector fails with error:

[2021-03-25 10:08:24,148] INFO Server 10.180.34.125:27017 is no longer a member of the replica set.  Removing from client view of cluster. (org.mongodb.driver.cluster:71)
[2021-03-25 10:08:24,148] INFO Setting max set version to 36 from replica set primary mongodb1:27017 (org.mongodb.driver.cluster:71)
[2021-03-25 10:08:24,176] INFO Closed connection [connectionId{localValue:54, serverValue:278213}] to 10.180.34.125:27017 because the pool has been closed. (org.mongodb.driver.connection:71)
[2021-03-25 10:08:24,176] INFO Closed connection [connectionId{localValue:54, serverValue:278213}] to 10.180.34.125:27017 because the pool has been closed. (org.mongodb.driver.connection:71)
[2021-03-25 10:08:24,176] ERROR WorkerSourceTask{id=mongo-source-ed-main-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
java.lang.IllegalStateException: state should be: open
	at com.mongodb.assertions.Assertions.isTrue(Assertions.java:72)
	at com.mongodb.internal.connection.DefaultServer.getConnection(DefaultServer.java:90)
	at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.getConnection(ClusterBinding.java:133)
	at com.mongodb.client.internal.ClientSessionBinding$SessionBindingConnectionSource.getConnection(ClientSessionBinding.java:135)
	at com.mongodb.operation.QueryBatchCursor.killCursor(QueryBatchCursor.java:322)
	at com.mongodb.operation.QueryBatchCursor.close(QueryBatchCursor.java:185)
	at com.mongodb.operation.ChangeStreamBatchCursor.close(ChangeStreamBatchCursor.java:97)
	at com.mongodb.client.internal.MongoChangeStreamCursorImpl.close(MongoChangeStreamCursorImpl.java:53)
	at com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:588)
	at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
[2021-03-25 10:08:24,176] ERROR WorkerSourceTask{id=mongo-source-ed-main-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
[2021-03-25 10:08:24,177] INFO Stopping MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:329)

As I understand exception happened because the task tried to close a cursor that already has been closed.

This happened on the 1.3.0 version, but I see that this part of the code in the master wasn't changed since 1.3.0.



 Comments   
Comment by Andrey B [ 29/Mar/21 ]

Hi, Ross!
Thanks for the quick fix.

When do you planning to make the next release?

Comment by Githook User [ 29/Mar/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Ensure closing the change stream cursor doesn't leak any errors.

KAFKA-212
Branch: master
https://github.com/mongodb/mongo-kafka/commit/7dd610eb9851281064349a384cff337256947b30

Comment by Ross Lawley [ 29/Mar/21 ]

HI andreworty@gmail.com,

Thanks for the ticket, this looks to be an issue of the Java driver itself, it should be more resilient to replicaset changes.

That said we can handle an exception better when trying to close the cursor in the connector, so it can close the cursor and then retry to create a new one to match the read preference.

Ross

Comment by Andrey B [ 26/Mar/21 ]

The ticket description not quite correct, but I can't edit it. The secondary was hidden not killed.

The problem reproduced on master.

curl http://localhost:8083/connector-plugins | jq
[
  {
    "class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "type": "sink",
    "version": "1.4.0-33-gd2e2927-dirty"
  },
  {
    "class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "type": "source",
    "version": "1.4.0-33-gd2e2927-dirty"
  },

I used this env to test https://github.com/mongodb/mongo-kafka/tree/master/docker

steps to reproduce:
1) change source connector config
"connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?readPreference=secondary",
2)./run.sh
3) wait until connectors works
4) execute on mongo primary:

cfg = rs.conf()
cfg.members[1].priority = 0
cfg.members[1].hidden = true
cfg.members[2].priority = 0
cfg.members[2].hidden = true
rs.reconfig(cfg)

Note: I just not sure what replica using by the connector, so hide both of them.

Generated at Thu Feb 08 09:05:51 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.