[KAFKA-115] Connector stop itself after db timeout. Created: 15/Jun/20  Updated: 27/Oct/23  Resolved: 15/Jun/20

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Gil Vander Marcken Assignee: Ross Lawley
Resolution: Works as Designed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
is related to KAFKA-105 Support errors.tolerance Closed

 Description   

We are running a single-instance mongo replica-set, sometimes this replica-set is changing availbility zones. It might take a bit of time for the db to be available again when this occurs (typically 2 minutes). 
During this time the connector loses the connection and shut itself down : 

com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongo:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: smatch-hub-mongo}, caused by {java.net.UnknownHostException: smatch-hub-mongo}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:152)
	at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:103)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:284)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:188)
	at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:203)
	at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:53)
	at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:128)
	at com.mongodb.client.internal.ChangeStreamIterableImpl$1.iterator(ChangeStreamIterableImpl.java:123)
	at com.mongodb.kafka.connect.source.MongoSourceTask.tryCreateCursor(MongoSourceTask.java:242)
	at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:226)
	at com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:333)
	at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:155)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

The only way is then to manually restart the connector : 

Task is being killed and will not recover until manually restarted

Is it be possible to configure the behaviour of the source connector when encountering com.mongodb.MongoTimeoutException  ?



 Comments   
Comment by Ross Lawley [ 16/Jun/20 ]

Hi gilvdm@gmail.com,

A longer timeout would amount to being the same as a retry mechanism. KAFKA-105 will look into supporting error.tolerance in a similar way to the sink connector. The Sink connector also has a retry mechanism but with retryable writes and reads being supported by the driver the need for the feature is less.

The issue here is a MongoTimeoutException could mean an improperly configured connector or a reelection and a primary node change. However, 30 seconds (the default server selection timeout) should be long enough for a reelection of a primary node. I understand that in your case that is isn't but there is configuration you can use for your scenario.

Ross

Comment by Gil Vander Marcken [ 15/Jun/20 ]

Hi Ross Lawley,

 

Thank you for your feedback, I did not know about the trick for putting the serverSelectionTimeout in the url of the mongoDB. 
However I am not sure it will be sufficient or if in my situation I really need a retry.

I don't think I need mongo-db support, the db is not unavailable during a certain amount of time, that's a fact, it's out of their hands, the question is really how will the connector behave in such case. I understand the exception is emanating from com.mongodb inernals but com.mongodb.kafka.connect.source.MongoSourceTask should be able to handle MongoTimeoutException and propose a retry mechanism instead of flat out stopping. For further reference/inspiration the debezium mongodb connector has this feature.

Comment by Ross Lawley [ 15/Jun/20 ]

Hi gilvdm@gmail.com,

Thanks for your report. For future reference, please note that this project is for reporting bugs or feature suggestions for the driver. For MongoDB-related support discussion please post on the MongoDB Community Forums. A question like this involving more discussion would be best posted on the MongoDB community.

You can change the server selection timeout to one that is suitable for your situation via the url eg: mongodb://localhost:27017/?serverSelectionTimeoutMS=60000

KAFKA-105 will look at error tolerance for the connector in greater depth.

Ross

Generated at Thu Feb 08 09:05:37 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.