[KAFKA-85] java.lang.IllegalStateException: Queue full is thrown when copy.existing is set to true Created: 14/Feb/20  Updated: 28/Oct/23  Resolved: 14/Feb/20

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: 1.0.1

Type: Bug Priority: Major - P3
Reporter: Sendoh Daten Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

mongo-kafka-connect: 1.0.0
Kafka: 2.4.0
Kafka connect: 2.4.0
MongoDB server: 3.6.14
mongodb-driver-sync: 3.12.1


Case:

 Description   

 

When I want to import a 10G collection and set copy.exisintg to true, I receive the following error. All other settings use the default.

org.apache.kafka.connect.errors.ConnectException: java.lang.IllegalStateException: Queue full\n\tat com.mongodb.kafka.connect.source.MongoCopyDataManager.poll(MongoCopyDataManager.java:95)\n\tat com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:301)\n\tat com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:154)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)\n\tat java.base\/java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.base\/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.base\/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.base\/java.lang.Thread.run(Unknown Source)\nCaused by: java.lang.IllegalStateException: Queue full\n\tat java.base\/java.util.AbstractQueue.add(Unknown Source)\n\tat java.base\/java.util.concurrent.ArrayBlockingQueue.add(Unknown Source)\n\tat com.mongodb.client.internal.Java8ForEachHelper.forEach(Java8ForEachHelper.java:30)\n\tat com.mongodb.client.internal.Java8AggregateIterableImpl.forEach(Java8AggregateIterableImpl.java:54)\n\tat com.mongodb.kafka.connect.source.MongoCopyDataManager.copyDataFrom(MongoCopyDataManager.java:123)\n\tat com.mongodb.kafka.connect.source.MongoCopyDataManager.lambda$new$0(MongoCopyDataManager.java:87)\n\t... 5 more 

 

Any suggestion to resolve it?



 Comments   
Comment by Sendoh Daten [ 19/Feb/20 ]

our data pipeline is switched to debezium due to the priority and SMT documentation

Comment by Sendoh Daten [ 17/Feb/20 ]

Is the snapshot already released? https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect. I would like to use the snapshot because we would like to import data by mongo-kafka-connect very urgent.

Comment by Githook User [ 14/Feb/20 ]

Author:

{'username': 'rozza', 'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com'}

Message: Copying data - ensure the queue blocks if full

KAFKA-85
Branch: master
https://github.com/mongodb/mongo-kafka/commit/7e6bf97742f2ad75cde394d088823b86880cdf4e

Comment by Ross Lawley [ 14/Feb/20 ]

A SNAPSHOT will be released to sonatype first.

The 1.0.1 release is most likely to be in a couple of weeks.

Ross

Comment by Sendoh Daten [ 14/Feb/20 ]

Once the PR is merged, will 1.0.1 be released immediately? or there is release cycle?

Comment by Ross Lawley [ 14/Feb/20 ]

PR: https://github.com/rozza/mongo-kafka/pull/10

Comment by Ross Lawley [ 14/Feb/20 ]

I think poll.await.time.ms and copy.existing.queue.size are key. The larger the queue the more memory for the initial copy and the smaller the await time the quicker Kafka will publish any polled events to a topic.

Ultimately, the error here is MongoDB is pushing to the queue faster than its being consumed. I have a fix and will put that through code review, where I use queue.put instead of queue.add that will ensure the queue stays within size.

Ross

Comment by Sendoh Daten [ 14/Feb/20 ]

By checking the implementation, I think the following thought is under the right direction?

poll.await.time.ms -> small, so poll faster from queue
poll.max.batch.size-> larger, poll more from queue.
copy.existing.queue.sie-> larger, buffer more before producing to kafka
batch.size -> does not matter. CopyDataManger does not use it
copy.existing.max.threads -> small, so CopyDataManger return data slower

Comment by Sendoh Daten [ 14/Feb/20 ]

Thanks. I already tested a larger queue.size like 160,000, but it still fails after about 1 min. I also think setting small batch.size would be better.  Is there other config to test together?

My config:

{
    "name": "partner-test-1",
    "config": {
        "connector.class": "MongoSourceConnector",
        "errors.retry.timeout": "300",
        "database": "test",
        "copy.existing": "true",
        "topic.prefix": "mongodb-partner",
        "batch.size": "1",
        "connection.uri": "mongodb://dbmgo:27017",
        "name": "partner-test-1",
        "collection": "notifications",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    },
    "tasks": [
        {
            "connector": "partner-test-1",
            "task": 0
        }
    ],
    "type": "source"
}
 

Comment by Ross Lawley [ 14/Feb/20 ]

Hi unicorn.banachi@gmail.com,

An interim solution would be to configure the copy.existing.queue.size to be much larger than the default 16,000 value. Reviewing the code, the copy process simply adds data to the queue as it receives it from MongoDB. There are no checks on the queue size which if there is a delay pushing the data into Kafka then the queue can throw due it being full.

I'll look to see if this process can be improved and potentially add some blocking, to ensure the queue doesn't try to overflow.

Ross

Comment by Ross Lawley [ 14/Feb/20 ]

Hi unicorn.banachi@gmail.com,

Thanks for the ticket, I'll investigate the cause and work arounds. Just to clarify, are you importing a single collection? Can you share an example of your source connector configuration?

Ross

Generated at Thu Feb 08 09:05:32 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.