[KAFKA-159] Support dynamic collection naming strategies Created: 21/Sep/20  Updated: 28/Oct/23  Resolved: 08/Feb/21

Status: Closed
Project: Kafka Connector
Component/s: Sink
Affects Version/s: None
Fix Version/s: 1.4.0

Type: Improvement Priority: Major - P3
Reporter: Ross Lawley Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Documented
Related
related to KAFKA-186 Support dot notation lookups in dynam... Closed
Epic Link: KAFKA-47
Documentation Changes: Needed
Documentation Changes Summary:

Dynamic Namespace Mapping for the Sink

Added a new interface `com.mongodb.kafka.connect.sink.namespace.mapping.NamespaceMapper` with a `getNamespace` method.
Implementations can use either the raw `SinkRecord` or the `SinkDocument` to determine the correct `MongoNamespace` to sink the data to.

The sink connector includes two implementations:

  • `DefaultNamespaceMapper`
    Uses the configured database and the collection or topic name if no collection configured as the namespace.
  • `FieldPathNamespaceMapper`
    Uses a string from either the key or value document as the database or collection name.

The following configuration options can help configure namespace mapping:

  • `namespace.mapper`
    The class that determines the namespace to write the sink data to. By default this will be based on the 'database' configuration and either the topic name or the 'collection' configuration. Users can provide their own implementations of the 'NamespaceMapper' interface.
    Default: com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper.
  • `namespace.mapper.error.if.invalid`
    Throw an error if the mapped field is missing or an invalid bson type. Defaults to false. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.topic.mapping.FieldPathNamespaceMapper'.
  • `namespace.mapper.key.database.field`
    The key field to use as the destination database name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'.
  • `namespace.mapper.key.collection.field`
    The key field to use as the destination collection name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'.
  • `namespace.mapper.value.database.field`
    The value field to use as the destination database name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'.
  • `namespace.mapper.value.collection.field`
    The value field to use as the destination collection name. Requires the 'namespace.mapper' to be set to 'com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper'.

 Description   

Currently the strategy for where data is saved by the Sink connector is based upon the topics' database and collection configuration. This maps the data from each topic to a single collection.

Some users would like to dynamically create the collection to save the data into based upon the value in a SinkRecord. Examples could be:

  • Use a date based field as the basis for the collection naming to allow bucketing of data into MongoDB
  • Use a string field directly for the collection naming strategy

Users should also be able to write their own collection naming strategy class and include it to allow for full customizability.



 Comments   
Comment by Githook User [ 08/Feb/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Added Dynamic Namespace Mapping for the Sink

Added a new interface `NamespaceMapper` with a `getNamespace` method.
Implementations can use either the raw `SinkRecord` or the `SinkDocument` to
determine the correct `MongoNamespace` to sink the data to.

The sink connector includes two implementations:

  • `DefaultNamespaceMapper`
    Uses the configured database and the collection or topic name if no collection configured as the namespace.
  • `FieldPathNamespaceMapper`
    Uses a string from either the key or value document as the database or collection name.

KAFKA-159
Branch: master
https://github.com/mongodb/mongo-kafka/commit/3d799f189da04f6f35217151a7a704ec9385ae10

Generated at Thu Feb 08 09:05:43 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.