[KAFKA-224] Create new Sink Connector for bucket pattern Created: 19/May/21  Updated: 04/May/22  Resolved: 18/Oct/21

Status: Closed
Project: Kafka Connector
Component/s: Sink
Affects Version/s: None
Fix Version/s: None

Type: New Feature Priority: Unknown
Reporter: Naresh Maharaj (Inactive) Assignee: Robert Walters
Resolution: Duplicate Votes: 0
Labels: internal-user, size-small
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

All


Attachments: Java Source File MongoSinkTopicConfig.java     Java Source File UpdateOneBusinessKeyBucketStrategy.java    
Issue Links:
Documented
Duplicate
Documentation Changes: Needed
Documentation Changes Summary:

The usage and settings for the UpdateOneBusinessKeyBucketStrategy are listed below.


 Description   

Will take a document or an array of documents from a path in the source and bucket them to an array field in the destination document. In addition will also add an _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents.

Set the following configuration: document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=businessKeyField
document.id.strategy.partial.value.projection.type=AllowList

writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyBucketStrategy
writemodel.strategy.bucketing.source.field=dataPayLoadField
writemodel.strategy.bucketing.source.field.multiple=false
writemodel.strategy.bucketing.target.array=bucketField
writemodel.strategy.bucketing.target.array.size=3
writemodel.strategy.bucketing.target.array.unique=true



 Comments   
Comment by Naresh Maharaj (Inactive) [ 19/May/21 ]

Below is an example of 3 messages received with the same business key and all 3 messages have been bucketed

 
 
    "_id" : ObjectId("609d094b9fc9182fc4f5f8c5"), 
    "businessKey" : {
        "TxId" : "1"
    }, 
    "_insertedTS" : ISODate("2021-05-13T11:11:07.158+0000"), 
    "_modifiedTS" : ISODate("2021-05-13T11:14:10.410+0000"), 
    "accountId" : "IBAN0000000031673629402517"
    "messages" : [
        {
            "k" : "PAC0008"
            "v" : {
                "mdInsertDtTm" : ISODate("2021-05-12T13:59:27.000+0000"), 
                "xmlToJson" : {
                    "date" : "2021-05-12T14:59:27"
                    "amount" : 1009.23
                }
            }
        }, 
        {
            "k" : "PAC0007"
            "v" : {
                "mdInsertDtTm" : ISODate("2021-05-12T13:59:27.000+0000"), 
                "xmlToJson" : {
                    "date" : "2021-05-12T14:59:27"
                    "amount" : -3009.23
                }
            }
        }, 
        {
            "k" : "PAC0007"
            "v" : {
                "mdInsertDtTm" : ISODate("2021-05-12T13:59:27.000+0000"), 
                "xmlToJson" : {
                    "date" : "2021-05-12T14:59:27"
                    "amount" : -1009.23
                }
            }
        }
    ], 
    "metadata" : {
        "mdInsertDtTm" : "2021-05-12T14:59:27"
    }, 
    "tranactionId" : "1"
}

Generated at Thu Feb 08 09:05:53 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.