Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-403

skip duplicate key error

    • Type: Icon: Task Task
    • Resolution: Unresolved
    • Priority: Icon: Unknown Unknown
    • 1.15.0
    • Affects Version/s: None
    • Component/s: None
    • None
    • Java Drivers

      When we synchronize data between two MongoDBs using multiple Kafka connectors we need to be able to skip duplicate key error but not tolerate to all other errors (
      ERRORS_TOLERANCE_CONFIG = false).

      As I understand we just need to use BULK_WRITE_ORDERED_CONFIG=false add an extra error check to the StartedMongoSinkTask:

      .....
      private static final int DUPLICATE_KYE_ERROR_CODE = 11000;
      // TODO move this flag to settings
      private final boolean ignoreDuplicateKeyError = false;
      .......
      .......
      
      private void handleTolerableWriteException(
          final List<SinkRecord> batch,
          final boolean ordered,
          final RuntimeException e,
          final boolean logErrors,
          final boolean tolerateErrors) {
        if (e instanceof MongoBulkWriteException) {
          AnalyzedBatchFailedWithBulkWriteException analyzedBatch =
              new AnalyzedBatchFailedWithBulkWriteException(
                  batch,
                  ordered,
                  (MongoBulkWriteException) e,
                  errorReporter,
                  StartedMongoSinkTask::log);
          List<BulkWriteError> errors = ((MongoBulkWriteException) e).getWriteErrors();
      
          if (logErrors) {
            LOGGER.error(
                "Failed to put into the sink some records, see log entries below for the details", e);
            analyzedBatch.log();
          }
          if (tolerateErrors) {
            analyzedBatch.report();
          } else if (errors.size() == 1
              && errors.get(0).getCode() == DUPLICATE_KYE_ERROR_CODE
              && ignoreDuplicateKeyError) {
            LOGGER.error("Failed to put into the sink some records: Duplicate Key Error");
          } else {
            throw new DataException(e);
          }
        } else {
          if (logErrors) {
            log(batch, e);
          }
          if (tolerateErrors) {
            batch.forEach(record -> errorReporter.report(record, e));
          } else {
            throw new DataException(e);
          }
        }
      } 

       I've done tests and changed the code, but for some reason I can't add a new branch to the repo for the merge request.

            Assignee:
            Unassigned Unassigned
            Reporter:
            sergiy.maslov@everbridge.com Sergiy M
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated: