[KAFKA-130] Create example WriteModelStrategy Created: 27/Jul/20  Updated: 26/Apr/21  Resolved: 16/Oct/20

Status: Closed
Project: Kafka Connector
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Task Priority: Major - P3
Reporter: Ross Lawley Assignee: Ross Lawley
Resolution: Done Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Depends
Related
related to KAFKA-87 UpdateOneStrategy Closed

 Description   

Show how to do a custom example.



 Comments   
Comment by Diego Díez [ 26/Apr/21 ]

Is this example still working in 1.5.0? We are trying to implement a custom WriteModelStrategy. When returning null to indicate no op, we get a DataException: Could not build the WriteModel caused by a NullPointerException.

Could it be solved by using Optional.ofNullable instead of Optional.of here? https://github.com/mongodb/mongo-kafka/blob/621394f2197e31e0b6b07d8390bf6ee40e8cd501/src/main/java/com/mongodb/kafka/connect/sink/MongoProcessedSinkRecordData.java#L91

Comment by Ross Lawley [ 16/Oct/20 ]

Below is a custom change stream Write model example:

package example;
 
import org.apache.kafka.connect.errors.DataException;
 
import org.bson.BsonDocument;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.WriteModel;
 
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
import com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategy;
 
/**
 * A custom write model strategy
 *
 * This example takes the 'fullDocument' field from a change stream and creates a
 * ReplaceOne operation.
 */
public class CustomWriteModelStrategy implements WriteModelStrategy {
 
  private static String ID = "_id";
  @Override
  public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
    BsonDocument changeStreamDocument = document.getValueDoc()
        .orElseThrow(() -> new DataException("Missing value document"));
 
    BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
    if (fullDocument.isEmpty()) {
      return null; // Return null to indicate no op.
    }
 
    return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
  }
}

This code needs to be compiled and added to the Class Path / Plugin path for Kafka workers. (See https://docs.confluent.io/current/connect/managing/community.html)

Generated at Thu Feb 08 09:05:39 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.