Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-360

tombstones incompatible downstream integration

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Unknown Unknown
    • 1.11.0
    • Affects Version/s: None
    • Component/s: None
    • None
    • Needed
    • Hide

      Added configuration (change.stream.document.key.as.key) to use the document key for the sourceRecord key

      This is potentially a breaking change as the newly added configuration
      change.stream.document.key.as.key defaults to true.

      Previously, the resume token was used as the source key, but
      it limits the usefulness of tombstones both for topic compactions
      and for downstream implementations.

      Not all events relate to documents (eg drop collection) so fallbacks to
      resume token for any changestream events where there is no documentKey.

      As such this is considered both an improvement and a bug fix.

      Set to false to revert back to the previous behaviour.

      Show
      Added configuration ( change.stream.document.key.as.key ) to use the document key for the sourceRecord key This is potentially a breaking change as the newly added configuration change.stream.document.key.as.key defaults to true . Previously, the resume token was used as the source key, but it limits the usefulness of tombstones both for topic compactions and for downstream implementations. Not all events relate to documents (eg drop collection) so fallbacks to resume token for any changestream events where there is no documentKey . As such this is considered both an improvement and a bug fix. Set to false to revert back to the previous behaviour.

      Hi there,

      Following your release 1.9.0, I've configured a source connector with  `publish.full.document.only.tombstone.on.delete` to be able to publish tombstone records to a kafka compacted topic.

      It doesn't work as expected because the BsonDocument Key is created with the ChangeStream ObjectID instead of the DocumentId thus the SourceRecord is created with an unknown ID for downstream integration with kafka and does not match the key .

      This ChangeStream ObjectId has nothing to relate with the id used on the kafka records and would break the functionality of tombstone record because it would not be able to relate to an existing record (different key).

      I ended up cloning the repository and creating an additional configuration `documentid.on.tombstones` to get the DocumentId instead of the ChangeStream ObjectId on tombstone events. 

      If you agree, I can open a PR with this functionality, as follows:

       

      // StartedMongoSourceTask.java:250
      
      BsonDocument keyDocument;
      if (isTombstoneEvent && tombstoneWithDocumentId) {
        keyDocument =
            new BsonDocument(
                ID_FIELD,
                changeStreamDocument
                    .get(DOCUMENT_KEY_FIELD)
                    .asDocument()
                    .get(ID_FIELD));
      } else {
        keyDocument =
            sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA
                ? changeStreamDocument
                : new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));
      } 

       

       

      With this workaround I was able to integrate with kafka compacted topics.

      Are you aware of this misbehavior? Or do you suggest another way of producing this tombstone with relevant keys?

      btw, the documentation on your website is wrong regarding the config name:

      • source code: publish.full.document.only.tombstone.on.delete
      • website: publish.full.document.only.tombstones.on.delete

      ref. https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/all-properties/

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            goncalopinho@hotmail.com Goncalo Pinho
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

              Created:
              Updated:
              Resolved: