[KAFKA-92] Kafka connector - Number of sink tasks Created: 22/Mar/20  Updated: 28/Oct/23  Resolved: 19/May/20

Status: Closed
Project: Kafka Connector
Component/s: Sink
Affects Version/s: 1.0
Fix Version/s: 1.2.0

Type: Epic Priority: Major - P3
Reporter: Rajaramesh Yaramati Assignee: Ross Lawley
Resolution: Fixed Votes: 1
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

AWS EC2, Sharded cluster


Issue Links:
Related
Case:
Detailed Project Statuses:

Engineer(s): Ross

This epic is more like a one off ticket. We converted this work to an epic so it showed up on our board for quarterly planning.
The actual work required for this ticket turned out to be easier than expected and is now in code review.



 Description   
Epic Summary

Summary

The Kafka sink connector only ever supports a single task. Users should be able to use the tasks.max setting to increase parallelism with the connector:

tasks.max - The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism. 


Was:

I am testing this MongoDB sink connector to migrate large datasets (multi-TB) from one mongodb cluster to mongodb another. A challenge I am facing is throughput on the sink side. Irrespective of task.max parameter, only one sink task is created. Then I found below 2 tickets related to limiting the number of sink tasks to only 1. In this case, how do I improve throughput on the sink side? Just curious to know why number of tasks should limit to one? Were there any plans to improve this? 
https://jira.mongodb.org/browse/KAFKA-62
https://jira.mongodb.org/browse/KAFKA-46



 Comments   
Comment by Githook User [ 19/May/20 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Allow the Sink connector to use multiple tasks

Kafka Connect provides built-in support for parallelism and scalable data copying
by assigning topic partitions to tasks. This allows for parallelism at the cost of
sequentially processing the data.

When using multiple tasks, data will be processeed out of order to the order of
the topic. Each task will be assigned partitions from a topic and these will be
processed independently of the other partitions.

KAFKA-92
Branch: master
https://github.com/mongodb/mongo-kafka/commit/32f5458946d976d63d26be7a0f515be176c2cb14

Comment by Martin Andersson [ 12/May/20 ]

The only complication is that tasks will process the messages as they see them and this can lead to out of order processing of data from a topic when using multiple tasks. That could be an issue depending on the users dataflow.

As long as there is only one task processing for each kafka partition (and tasks flush their data on a rebalance) then it seems to me that the connector would hold the same ordering guarantees as kafka (e.g. ordering is only guaranteed by topic-partition). As a user of this connector I wouldn't expect anything more.

Comment by Ross Lawley [ 12/May/20 ]

PR: https://github.com/rozza/mongo-kafka/pull/17

For the sink we can give the control of processing the messages from a topic over to Kafka. So multiple tasks will be assigned messages based on their topic - partition automatically.

The only complication is that tasks will process the messages as they see them and this can lead to out of order processing of data from a topic when using multiple tasks. That could be an issue depending on the users dataflow.

Comment by Alexey Menshikov [ 30/Apr/20 ]

The customer is trying to reach 10k insert rate for 2kb documents, but seems like that's not possible using just one sink connector. The database better handles multiple inserting threads than just one. The ability to increase the level of parallelism should significantly increase the insertion rate.

Comment by Ross Lawley [ 01/Apr/20 ]

Hi yaramati@adobe.com,

I'm glad you were able to work around the issue and improve the parallelism,

We could also potentially use a thread pool of workers mapped per topic to improve the concurrency across topics

This could be a future improvement to the connector that would stop the need for registering multiple connectors and allow the same as parallelism but with a single registered connector, by using a thread pool of workers, with one worker thread per topic.

Ross

Comment by Rajaramesh Yaramati [ 31/Mar/20 ]

Thank you, Ross Lawley, for addressing this ticket.

For the use case (Data migration from one cluster to another) I am using, it required sequential write on target. I realized this only after getting to know the reason why only one sink task. 

Now I created topic per shared and at least I am getting sink task parallelisms equal to the number of shards. 

Can you please elaborate on what do you mean by "We could also potentially use a thread pool of workers mapped per topic to improve the concurrency across topics."? 

Thanks,

Rajaramesh Yaramati

Comment by Ross Lawley [ 31/Mar/20 ]

Hi yaramati@adobe.com,

Thanks for the ticket, at the moment only a single task is used when writing data to MongoDB. This is by design to ensure that all writes are sequential and ordered as seen on the topic. If you are watching multiple topics then it may be more efficient to set up a connector per topic, as that would improve the concurrency of the operations.

Do you require all writes to happen sequentially and in order? If that is not the case then potentially a new feature could be added to allow this.
Are you watching multiple topics? We could also potentially use a thread pool of workers mapped per topic to improve the concurrency across topics.

Kind Regards,

Ross Lawley

Generated at Thu Feb 08 09:05:34 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.