-
Type: Task
-
Resolution: Unresolved
-
Priority: Unknown
-
Affects Version/s: None
-
Component/s: None
-
None
-
Java Drivers
When we synchronize data between two MongoDBs using multiple Kafka connectors we need to be able to skip duplicate key error but not tolerate to all other errors (
ERRORS_TOLERANCE_CONFIG = false).
As I understand we just need to use BULK_WRITE_ORDERED_CONFIG=false add an extra error check to the StartedMongoSinkTask:
..... private static final int DUPLICATE_KYE_ERROR_CODE = 11000; // TODO move this flag to settings private final boolean ignoreDuplicateKeyError = false; ....... ....... private void handleTolerableWriteException( final List<SinkRecord> batch, final boolean ordered, final RuntimeException e, final boolean logErrors, final boolean tolerateErrors) { if (e instanceof MongoBulkWriteException) { AnalyzedBatchFailedWithBulkWriteException analyzedBatch = new AnalyzedBatchFailedWithBulkWriteException( batch, ordered, (MongoBulkWriteException) e, errorReporter, StartedMongoSinkTask::log); List<BulkWriteError> errors = ((MongoBulkWriteException) e).getWriteErrors(); if (logErrors) { LOGGER.error( "Failed to put into the sink some records, see log entries below for the details", e); analyzedBatch.log(); } if (tolerateErrors) { analyzedBatch.report(); } else if (errors.size() == 1 && errors.get(0).getCode() == DUPLICATE_KYE_ERROR_CODE && ignoreDuplicateKeyError) { LOGGER.error("Failed to put into the sink some records: Duplicate Key Error"); } else { throw new DataException(e); } } else { if (logErrors) { log(batch, e); } if (tolerateErrors) { batch.forEach(record -> errorReporter.report(record, e)); } else { throw new DataException(e); } } }
I've done tests and changed the code, but for some reason I can't add a new branch to the repo for the merge request.