[KAFKA-253] Allow Kafka Sink Connector to Execute Unordered Bulk Operations Created: 29/Sep/21  Updated: 28/Oct/23  Resolved: 12/Jan/22

Status: Closed
Project: Kafka Connector
Component/s: Sink
Affects Version/s: None
Fix Version/s: 1.7.0

Type: Improvement Priority: Major - P3
Reporter: Diego Rodriguez (Inactive) Assignee: Valentin Kavalenka
Resolution: Fixed Votes: 0
Labels: internal-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Documented
Epic Link: Write MongoDB errors to the DLQ
Quarter: FY22Q4
Case:
Documentation Changes: Needed
Documentation Changes Summary:

If implemented, this new behavior will need to be properly documented.


 Description   

Hi Team,

As of now, the Sink Connector only executes operations using "ordered" bulk writes that guarantee the message ordering within each source topic partition.

There might be circumstances where ordering is not required and executing "unordered" bulk operations might have benefits:

  • Increase performance due to the ability to parallelize "unordered" bulk write operations.
  • Avoiding the whole batch of messages to fail, specially when the operation that fails is the first one of the "ordered" list.

The default should still be "ordered" bulk writes but I propose to add the ability to modify this behavior by adding a property to switch to "unordered" bulk operations. The implications of this change should be made very clear in our documentation page and it might even be wise to throw a warning about message processing order in the logs.

Thanks
Diego



 Comments   
Comment by Githook User [ 19/Jan/22 ]

Author:

{'name': 'Valentin Kovalenko', 'email': 'valentin.male.kovalenko@gmail.com', 'username': 'stIncMale'}

Message: Mention `bulk.write.ordered` in `CHANGELOG.md` (#98)

KAFKA-253
Branch: master
https://github.com/mongodb/mongo-kafka/commit/acada7ae2c9ec66f38819f01c84545ca10a0f5fa

Comment by Githook User [ 12/Jan/22 ]

Author:

{'name': 'Valentin Kovalenko', 'email': 'valentin.male.kovalenko@gmail.com', 'username': 'stIncMale'}

Message: Add support for the `bulk.write.ordered` sink connector property (#96)

KAFKA-253
Branch: master
https://github.com/mongodb/mongo-kafka/commit/09ec070f9bb153aadaf62c210e0d06be23c96445

Comment by Albert Wong (Inactive) [ 14/Oct/21 ]

Problem Statement:

The Mongo Kafka Connector sink process works with a bulk insert in an ordered fashion.  As a result, if an error is encountered during the insert process (for example, violation of an unique constraint), not only does the individual record encountering the error fail, but all subsequent records in the batch also fail.  Furthermore, the failure occurs without an expected error message or routing of the failed inserts to the dead letter topic.

 

For example, if records 1-10 are being inserted into Mongo through the connector process and record #6 already exists in the target collection, not only with record #6 fail to insert, but records 7-10 will as well. None of these 5 records (6-10) will be written to the dead letter topic.

 

Stories:

  1. Should a Mongo Kafka Connector sink insert/upsert write of an individual record fail for any reason, records that fail to be written should be placed in dead letter topic.  This should happen regardless of ordered or unordered

Acceptance Criteria:

  • Any record that cannot be inserted or upserted into Mongo from Kafka using the Mongo sink connector should be placed into the dead letter topic for the Mongo sink connector after some configurable period of retries
  1. For any insert, all records that do not error during the write process should succeed.  In the above scenario, this will result in records 1-5 and 7-10 successfully being inserted into the Mongo collection.  Record #6 will fail, and should be put into dead letter as per story #1.  The writes (1-5, 7-10) should happen using the ordered condition to maintain the time series nature of the source Kafka topic

Acceptance Criteria:

  • All records from the source Kafka topic are inserted into Mongo in sequential order
  • Any individual records that cannot be inserted (for any reason) are placed into dead letter after some period of retries (configurable)
  • The sink connector should have a parameter allowing the process to run in either an ordered or an unordered fashion
  1. For any insert, all records that do not error during the write process should succeed.  In the above scenario, this will result in records 1-5 and 7-10 successfully being inserted into the Mongo collection.  Record #6 will fail, and should be put into dead letter.  These writes should happen using the unordered condition

Acceptance Criteria:

  • All records from the source Kafka topic are inserted into Mongo in any order
  • Any individual records that cannot be inserted (for any reason) are placed into dead letter after some period of retries (configurable)
  • The sink connector should have a parameter allowing the process to run in either an ordered or an unordered fashion
  1. For any insert, all records that do not error during the write process should succeed.  An example of a failure condition would be violation of the unique constraint associated with the primary key for the collection. In the above scenario, this will result in records 1-5 and 7-10 successfully being inserted into the Mongo collection.  Record #6 will fail, and should be put into dead letter.  These writes should happen using the unordered condition.

Acceptance Criteria:

  • All records from the source Kafka topic are inserted into Mongo in any order
  • Any individual records that cannot be inserted (for any reason) are placed into dead letter after some period of retries (configurable)
  • The sink connector should have a parameter allowing the process to run in either an ordered or an unordered fashion

 

Of these four stories, Story #1 is an absolute requirement – data cannot be allowed to fall on the floor without some error notification or other means by which we can track what happened to the data.  Story #2 is the ideal end result.  Story #3 is the next best option, with Story #4 being the minimal solution (the difference between story #3 and #4 is story #3 is generic against any error, with story #4 focusing on the specific error we have encountered.  While I would recommend the more broad solution for other customers, the more focused one would be the minimum criteria).

Generated at Thu Feb 08 09:05:57 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.