[KAFKA-85] java.lang.IllegalStateException: Queue full is thrown when copy.existing is set to true Created: 14/Feb/20 Updated: 28/Oct/23 Resolved: 14/Feb/20 |
|
| Status: | Closed |
| Project: | Kafka Connector |
| Component/s: | None |
| Affects Version/s: | None |
| Fix Version/s: | 1.0.1 |
| Type: | Bug | Priority: | Major - P3 |
| Reporter: | Sendoh Daten | Assignee: | Ross Lawley |
| Resolution: | Fixed | Votes: | 0 |
| Labels: | None | ||
| Remaining Estimate: | Not Specified | ||
| Time Spent: | Not Specified | ||
| Original Estimate: | Not Specified | ||
| Environment: |
mongo-kafka-connect: 1.0.0 |
||
| Case: | (copied to CRM) |
| Description |
|
When I want to import a 10G collection and set copy.exisintg to true, I receive the following error. All other settings use the default.
Any suggestion to resolve it? |
| Comments |
| Comment by Sendoh Daten [ 19/Feb/20 ] | ||||||||||||||||||||||||
|
our data pipeline is switched to debezium due to the priority and SMT documentation | ||||||||||||||||||||||||
| Comment by Sendoh Daten [ 17/Feb/20 ] | ||||||||||||||||||||||||
|
Is the snapshot already released? https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect. I would like to use the snapshot because we would like to import data by mongo-kafka-connect very urgent. | ||||||||||||||||||||||||
| Comment by Githook User [ 14/Feb/20 ] | ||||||||||||||||||||||||
|
Author: {'username': 'rozza', 'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com'}Message: Copying data - ensure the queue blocks if full
| ||||||||||||||||||||||||
| Comment by Ross Lawley [ 14/Feb/20 ] | ||||||||||||||||||||||||
|
A SNAPSHOT will be released to sonatype first. The 1.0.1 release is most likely to be in a couple of weeks. Ross | ||||||||||||||||||||||||
| Comment by Sendoh Daten [ 14/Feb/20 ] | ||||||||||||||||||||||||
|
Once the PR is merged, will 1.0.1 be released immediately? or there is release cycle? | ||||||||||||||||||||||||
| Comment by Ross Lawley [ 14/Feb/20 ] | ||||||||||||||||||||||||
| Comment by Ross Lawley [ 14/Feb/20 ] | ||||||||||||||||||||||||
|
I think poll.await.time.ms and copy.existing.queue.size are key. The larger the queue the more memory for the initial copy and the smaller the await time the quicker Kafka will publish any polled events to a topic. Ultimately, the error here is MongoDB is pushing to the queue faster than its being consumed. I have a fix and will put that through code review, where I use queue.put instead of queue.add that will ensure the queue stays within size. Ross | ||||||||||||||||||||||||
| Comment by Sendoh Daten [ 14/Feb/20 ] | ||||||||||||||||||||||||
|
By checking the implementation, I think the following thought is under the right direction? poll.await.time.ms -> small, so poll faster from queue | ||||||||||||||||||||||||
| Comment by Sendoh Daten [ 14/Feb/20 ] | ||||||||||||||||||||||||
|
Thanks. I already tested a larger queue.size like 160,000, but it still fails after about 1 min. I also think setting small batch.size would be better. Is there other config to test together? My config:
| ||||||||||||||||||||||||
| Comment by Ross Lawley [ 14/Feb/20 ] | ||||||||||||||||||||||||
|
An interim solution would be to configure the copy.existing.queue.size to be much larger than the default 16,000 value. Reviewing the code, the copy process simply adds data to the queue as it receives it from MongoDB. There are no checks on the queue size which if there is a delay pushing the data into Kafka then the queue can throw due it being full. I'll look to see if this process can be improved and potentially add some blocking, to ensure the queue doesn't try to overflow. Ross | ||||||||||||||||||||||||
| Comment by Ross Lawley [ 14/Feb/20 ] | ||||||||||||||||||||||||
|
Thanks for the ticket, I'll investigate the cause and work arounds. Just to clarify, are you importing a single collection? Can you share an example of your source connector configuration? Ross |