[KAFKA-223] Write MongoDB errors to the DLQ Created: 19/May/21  Updated: 28/Oct/23  Resolved: 12/Jan/22

Status: Closed
Project: Kafka Connector
Component/s: Sink
Affects Version/s: 1.5.0
Fix Version/s: 1.7.0

Type: Epic Priority: Major - P3
Reporter: Louis DEVERGNIES Assignee: Valentin Kavalenka
Resolution: Fixed Votes: 2
Labels: external-user, size-medium
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Quarter: FY22Q4
Case:
Start date:
End date:
Calendar Time: 7 weeks, 1 day
Scope Cost Estimate: 8
Cost to Date: 6
Final Cost Estimate: 6
Cost Threshold %: 100
Detailed Project Statuses:

Engineer(s): Valentin

Summary: MongoDB Kafka Sink Connector should report sink records that it fails to put into the sink to the dead letter queue (DLQ) in situations, when the connector is configured to tolerate errors and to continue processing other records instead of failing fast and requiring manual intervention.

2022-01-11: Setting initial target end date to 2022-01-28

  • Basic writing to DLQ is completed
  • Special handling for unordered bulk operations in progress


 Description   

Documentation:

errors.deadletterqueue.topic.name: Name of topic to use as the dead letter queue.

 

But:

When errors.deadletterqueue.topic.name is a valid Kafka topic name ** And exception happens at com.mongodb.kafka.connect.sink.MongoSinkTask.bulkWriteBatch(MongoSinkTask.java:209) Then no message is send to the DLQ.

 

Cause probably is in catch block which does not use the ErrantRecordReporter

private void bulkWriteBatch(final List<MongoProcessedSinkRecordData> batch) {
  ...
  try {
    ...
  } catch (MongoException e) {
    LOGGER.warn(
        "Writing {} document(s) into collection [{}] failed.",
        writeModels.size(),
        namespace.getFullName());
    handleMongoException(config, e);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new DataException("Rate limiting was interrupted", e);
  } catch (Exception e) {
    if (config.logErrors()) {
      LOGGER.error("Failed to write mongodb documents", e);
    }
    if (!config.tolerateErrors()) {
      throw new DataException("Failed to write mongodb documents", e);
    }
  }
}

Developer notes

The scope (WRITING-9407) was rewritten, and this section was moved there.



 Comments   
Comment by Charles Scyphers [ 25/Oct/21 ]

@Diego

The optimal solution would be records 7-10 would also be inserted – only records with the duplicate key exception are placed in the DLQ.  However, the absolute minimum requirement would be for the records to go into the DLQ so users can track awareness of the insert failure and remediate as needed

Comment by Diego Rodriguez (Inactive) [ 22/Oct/21 ]

When this ticket is implemented, what will happen with the messages that are not written because they are positioned in the bulk operation after the one that fails? Will those be sent to the DLQ?

Example:

  • Records 1-10 are being inserted into MongoDB through the Sink Connector
  • Record 6 already exists in the target collection and results in duplicate key exception
  • Record 6 fails to be inserted and is sent to the DLQ but records 7-10 will not be inserted as well. After this ticket, will records 7-10 be sent to the same DLQ too?
Comment by Albert Wong (Inactive) [ 19/Oct/21 ]

Problem Statement:

The Mongo Kafka Connector sink process works with a bulk insert in an ordered fashion.  As a result, if an error is encountered during the insert process (for example, violation of an unique constraint), not only does the individual record encountering the error fail, but all subsequent records in the batch also fail.  Furthermore, the failure occurs without an expected error message or routing of the failed inserts to the dead letter topic.

 

For example, if records 1-10 are being inserted into Mongo through the connector process and record #6 already exists in the target collection, not only with record #6 fail to insert, but records 7-10 will as well. None of these 5 records (6-10) will be written to the dead letter topic.

 

Stories:

  1. Should a Mongo Kafka Connector sink insert/upsert write of an individual record fail for any reason, records that fail to be written should be placed in dead letter topic.  This should happen regardless of ordered or unordered

Acceptance Criteria:

  • Any record that cannot be inserted or upserted into Mongo from Kafka using the Mongo sink connector should be placed into the dead letter topic for the Mongo sink connector after some configurable period of retries
  1. For any insert, all records that do not error during the write process should succeed.  In the above scenario, this will result in records 1-5 and 7-10 successfully being inserted into the Mongo collection.  Record #6 will fail, and should be put into dead letter as per story #1.  The writes (1-5, 7-10) should happen using the ordered condition to maintain the time series nature of the source Kafka topic

Acceptance Criteria:

  • All records from the source Kafka topic are inserted into Mongo in sequential order
  • Any individual records that cannot be inserted (for any reason) are placed into dead letter after some period of retries (configurable)
  • The sink connector should have a parameter allowing the process to run in either an ordered or an unordered fashion
  1. For any insert, all records that do not error during the write process should succeed.  In the above scenario, this will result in records 1-5 and 7-10 successfully being inserted into the Mongo collection.  Record #6 will fail, and should be put into dead letter.  These writes should happen using the unordered condition

Acceptance Criteria:

  • All records from the source Kafka topic are inserted into Mongo in any order
  • Any individual records that cannot be inserted (for any reason) are placed into dead letter after some period of retries (configurable)
  • The sink connector should have a parameter allowing the process to run in either an ordered or an unordered fashion
  1. For any insert, all records that do not error during the write process should succeed.  An example of a failure condition would be violation of the unique constraint associated with the primary key for the collection. In the above scenario, this will result in records 1-5 and 7-10 successfully being inserted into the Mongo collection.  Record #6 will fail, and should be put into dead letter.  These writes should happen using the unordered condition.

Acceptance Criteria:

  • All records from the source Kafka topic are inserted into Mongo in any order
  • Any individual records that cannot be inserted (for any reason) are placed into dead letter after some period of retries (configurable)
  • The sink connector should have a parameter allowing the process to run in either an ordered or an unordered fashion

 

Of these four stories, Story #1 is an absolute requirement – data cannot be allowed to fall on the floor without some error notification or other means by which we can track what happened to the data.  Story #2 is the ideal end result.  Story #3 is the next best option, with Story #4 being the minimal solution (the difference between story #3 and #4 is story #3 is generic against any error, with story #4 focusing on the specific error we have encountered.  While I would recommend the more broad solution for other customers, the more focused one would be the minimum criteria).

Comment by Charles Scyphers [ 28/Sep/21 ]

This caused production impact to end customers in our environment.  We have transactions error out and not be delivered, where we would have been expecting errored inserts/updates/upserts to land in dead letter (as there was a problem with the operation)

Generated at Thu Feb 08 09:05:53 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.