[KAFKA-40] Support schema for Source connector Created: 09/Jul/19  Updated: 22/Sep/21  Resolved: 08/Sep/20

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: 1.3.0

Type: Epic Priority: Major - P3
Reporter: Scott L'Hommedieu (Inactive) Assignee: Ross Lawley
Resolution: Done Votes: 12
Labels: FY21Q2
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Depends
is depended on by KAFKA-48 Can't set custom message key Closed
Duplicate
is duplicated by KAFKA-33 Add Google Protocol buffers support Closed
Related
Case:
Start date:
End date:
Calendar Time: 7 weeks, 4 days
Detailed Project Statuses:

Engineer(s): Ross

2020-08-25:

  • The last piece of work is waiting on a review from Durran and Jeff.

2020-07-28: Initial target of 2020-08-21 (6 Weeks)
This project captures most of the work for the Kafka 1.3.0 release.

Goals for the next month:

  • Support output to schema
  • Allow configuration of the Source Record Key
  • Support auto generated schema structs
  • Allow namespace based configuration overrides

Completed work:

  • Add support to output bson from the source


 Description   
Epic Summary

Summary

Currently the source connector converts all change stream documents to extended Json strings before publishing to a topic. 
This requires users to use the org.apache.kafka.connect.storage.StringConverter converter which has some space / efficiency costs in Kafka (especially when it comes to storage).  There are various other converters that allow the kafka topic subscriber to receive events in the format of their choice.
The MongoDB Source Connector should allow users to configure a number of converters such as io.confluent.connect.avro.AvroConverter, org.apache.kafka.connect.json.JsonConverter or org.apache.kafka.connect.converters.ByteArrayConverter.

Change stream events have a defined schema as such but contain a number of optional fields: 

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection>"
   },
   "to" : {
      "db" : "<database>",
      "coll" : "<collection>"
   },
   "documentKey" : { "_id" : <value> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
} 

 

 The main complexities will be deciding how to handle dynamic document data:  fullDocument, documentKey and updateDescription.updatedFields.

 

A secondary aim will be to ensure users can access data from the ChangeStream in a way that is easily consumable outside of MongoDB: KAFKA-99



 Comments   
Comment by Jeffrey Cheak [ 25/Aug/20 ]

Can you clarify which Jeff?

Comment by Hamid Jawaid [ 13/Jul/20 ]

Hi Ross,

Does this mean, till 1.2 MongoDB Source Connector can not have a key other than the default one(ChangeStreamEvent-ID) as ValueToKey won't work due to error mentioned in this Epic?

Default Key: {"_id":{"_data":"<unique-changestream-event-id->"}}

Or, we can still have a custom key but need to write our own custom SMT/Converter?

eg. I have a nested Document and one of the field-value(unique) I want to use as partition key so that ordering of updates are maintained. My key would be like: fullDocument->userId

Is it possible to use custom field(eg fullDocument->userId) as partition key using custom SMT/Converter in MongoDB Source Connector?
I am flexible to use any key/value converter eg StringConverter/JsonConverter/AvroConverter as long as I can use my custom field as partition key.

Comment by Ludovic Dussart [ 19/May/20 ]

Related to this limitation, it is not possible to use Kafka Simple Message Transformation because SMT need to manipulate a STRUCT (and not a String).

Even if io.confluent.connect.avro.AvroConverter is used for key or value, the type in declared as String, resulting by a Kafka exception : 

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field replacement], found: java.lang.StringCaused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field replacement], found: java.lang.String at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52) at org.apache.kafka.connect.transforms.ReplaceField.applyWithSchema(ReplaceField.java:150) at org.apache.kafka.connect.transforms.ReplaceField.apply(ReplaceField.java:129) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 more

Related sink connector configuration :

{
 "connector.class""com.mongodb.kafka.connect.MongoSourceConnector",
         "tasks.max""1",
         "change.stream.full.document""updateLookup",
         "collection""currencies",
         "topic.prefix""xxx",
         "database""xxx",
         "connection.uri": xxx",
         "name""mongo-source-currencies",
         "copy.existing""true",
         "value.converter""io.confluent.connect.avro.AvroConverter",
         "key.converter""io.confluent.connect.avro.AvroConverter",
        "transforms": "RenameField",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.RenameField.renames": "fullDocument.fromDate.$date:fullDocument.fromDate.date"
 }

 

Generated at Thu Feb 08 09:05:25 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.