[KAFKA-269] Write mode to Embed Entities Created: 08/Dec/21  Updated: 17/Jul/23

Status: Backlog
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: New Feature Priority: Major - P3
Reporter: Juan Soto (Inactive) Assignee: Unassigned
Resolution: Unresolved Votes: 1
Labels: internal-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Attachments: Java Source File UpdateOneBusinessKeyArrayPipeline.java    

 Description   

Hello team,

At the moment kafka connect only works with relational models or where you can merge multiple streams in one document.

 

If we want to work with model like the picture attached.

There is no write model that supports embed entities. 

 

{
_id:,
orderNumber:,
orderDate:,
lineItems:[
           {lineNumer:,
             product:{}}]
}

Is an array the best approach for kafka connect? When you design a kafka stream you need to create idempotent flows (as much as possible). If kafka rebalance is it possible that your connector pushes two times the same item. 

For this reason, the best schema in my opinion is

 

{_id:orderNumber:,
orderDate:,
lineItems:{  1:{lineNumber:1,product:{}    }, 
             2:{ lineNumber:2,}
}
}

The upsert operation that manages this schema is the following.

 

 

var bulk = db.oli.initializeUnorderedBulkOp();
bulk.find({}).upsert().updateOne(
   [
   {$set:{ids: {$ifNull:["$ids",[]]}}},
   { $set: {
       modified: "$$NOW"
       ids: { $cond:{if:{$in:[12,"$ids"]},then:"$ids",else:{$concatArrays:["$ids",[12]]}}  } ,
       "item.12":{num:122, id:12}
      
   }  },{
       /**
        * newField: The new field name.
        * expression: The new field expression.
        */
       $addFields: {
           last:{
               $cond:{
                   if:{$eq:[{$size:"$ids"},4]},
                   then: {$toString:{$first: "$ids" }},
                   else: ""}
           },
           object: {$cond:{
                   if:{$eq:[{$size:"$ids"},4]},
                   then: {$objectToArray:"$item"},
                   else: ""} }
       }
   },{
       $set:{object:{
           $cond:{
               if:{$isArray:"$object"},
               then: {
                   $filter:{
                           input:"$object",
                           as:"item",
                           cond:{$ne:["$$item.k","$last"]}
                   }
               },
               else :""
           }
       }}
   },{$set:{
       item:{
           $cond:{
               if:{$isArray:"$object"},
               then:{$arrayToObject:"$object"},
               else:"$item"
           }
       },
       //last:"",
       ids:{$slice:["$ids",-3]}},
       }
   ]
);
bulk.execute();

At the moment the new write mode is not supporting new connector properties. But this will be some of them:

  • Name of the array. At the moment this property is read from the kafka message on the field named "item"
  • Name of the id field of the embedded object. This is hardcoded with the value id.
  • Array size. At the moment the size is hardcode to 4.

 

This new write mode is using some of the functionality provided by UpdateOneBusinessKeyTimestampStrategy

Regards,

Juan

 

 

 


Generated at Thu Feb 08 09:05:59 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.