package example;
|
|
import org.apache.kafka.connect.errors.DataException;
|
|
import org.bson.BsonDocument;
|
import com.mongodb.client.model.Filters;
|
import com.mongodb.client.model.ReplaceOneModel;
|
import com.mongodb.client.model.WriteModel;
|
|
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
|
import com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategy;
|
|
/**
|
* A custom write model strategy
|
*
|
* This example takes the 'fullDocument' field from a change stream and creates a
|
* ReplaceOne operation.
|
*/
|
public class CustomWriteModelStrategy implements WriteModelStrategy {
|
|
private static String ID = "_id";
|
@Override
|
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
|
BsonDocument changeStreamDocument = document.getValueDoc()
|
.orElseThrow(() -> new DataException("Missing value document"));
|
|
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
|
if (fullDocument.isEmpty()) {
|
return null; // Return null to indicate no op.
|
}
|
|
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
|
}
|
}
|