Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-269

Write mode to Embed Entities

    XMLWordPrintableJSON

Details

    • Icon: New Feature New Feature
    • Resolution: Unresolved
    • Icon: Major - P3 Major - P3
    • None
    • None
    • None

    Description

      Hello team,

      At the moment kafka connect only works with relational models or where you can merge multiple streams in one document.

       

      If we want to work with model like the picture attached.

      There is no write model that supports embed entities. 

       

      {
      _id:,
      orderNumber:,
      orderDate:,
      lineItems:[
                 {lineNumer:,
                   product:{}}]
      }
      

      Is an array the best approach for kafka connect? When you design a kafka stream you need to create idempotent flows (as much as possible). If kafka rebalance is it possible that your connector pushes two times the same item. 

      For this reason, the best schema in my opinion is

       

      {_id:orderNumber:,
      orderDate:,
      lineItems:{  1:{lineNumber:1,product:{}    }, 
                   2:{ lineNumber:2,}
      }
      }
      

      The upsert operation that manages this schema is the following.

       

       

      var bulk = db.oli.initializeUnorderedBulkOp();
      bulk.find({}).upsert().updateOne(
         [
         {$set:{ids: {$ifNull:["$ids",[]]}}},
         { $set: {
             modified: "$$NOW"
             ids: { $cond:{if:{$in:[12,"$ids"]},then:"$ids",else:{$concatArrays:["$ids",[12]]}}  } ,
             "item.12":{num:122, id:12}
            
         }  },{
             /**
              * newField: The new field name.
              * expression: The new field expression.
              */
             $addFields: {
                 last:{
                     $cond:{
                         if:{$eq:[{$size:"$ids"},4]},
                         then: {$toString:{$first: "$ids" }},
                         else: ""}
                 },
                 object: {$cond:{
                         if:{$eq:[{$size:"$ids"},4]},
                         then: {$objectToArray:"$item"},
                         else: ""} }
             }
         },{
             $set:{object:{
                 $cond:{
                     if:{$isArray:"$object"},
                     then: {
                         $filter:{
                                 input:"$object",
                                 as:"item",
                                 cond:{$ne:["$$item.k","$last"]}
                         }
                     },
                     else :""
                 }
             }}
         },{$set:{
             item:{
                 $cond:{
                     if:{$isArray:"$object"},
                     then:{$arrayToObject:"$object"},
                     else:"$item"
                 }
             },
             //last:"",
             ids:{$slice:["$ids",-3]}},
             }
         ]
      );
      bulk.execute();
      

      At the moment the new write mode is not supporting new connector properties. But this will be some of them:

      • Name of the array. At the moment this property is read from the kafka message on the field named "item"
      • Name of the id field of the embedded object. This is hardcoded with the value id.
      • Array size. At the moment the size is hardcode to 4.

       

      This new write mode is using some of the functionality provided by UpdateOneBusinessKeyTimestampStrategy

      Regards,

      Juan

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            juan.soto@mongodb.com Juan Soto (Inactive)
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: