[KAFKA-208] Error on heartbeat setup with existing SMT Created: 16/Mar/21  Updated: 27/Oct/23  Resolved: 29/Mar/21

Status: Closed
Project: Kafka Connector
Component/s: Source
Affects Version/s: 1.3.0
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Vlad Goldman Assignee: Ross Lawley
Resolution: Works as Designed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related

 Description   

When trying to add a heartbeatĀ to the existing working configuration

    "heartbeat.interval.ms": 60000,
    "heartbeat.topic.name": "heartbeats-mongodb"

I'm getting the following error:
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String
configuration example:

{
  "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", 
  "transforms.extractKeyField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "output.format.value": "schema",  
  "change.stream.full.document": "updateLookup",  
  "transforms": "ValueToKey,extractKeyField",  
  "transforms.extractKeyField.field": "documentKey",  
  "collection": "test", 
  "transforms.ValueToKey.fields": "documentKey",
  "key.converter.schemas.enable": "false",  
  "database": "db", 
  "topic.prefix": "prefix", 
  "connection.uri": "mongodb://localhost", 
  "value.converter.schemas.enable": "false",  
  "name": "mongodb-source", 
  "copy.existing": "true", 
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",  
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",  
  "poll.max.batch.size": "100",  
  "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey"
}



 Comments   
Comment by Tasos Zervos [ 12/Sep/23 ]

vlad@guesty.com - have you found a resolution to this last issue?

Comment by Vlad Goldman [ 30/Dec/21 ]

@ross.lawley thank you for the great suggestion with predicate, we are using it in production since that time. Although now we faced another problem that seems to be relevant, in case if we want to add heartbeat to a connect that using schema registry and subject name strategy like:

value.subject.name.strategy = io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

we do get the following error:

org.apache.kafka.common.errors.SerializationException: In configuration value.subject.name.strategy = io.confluent.kafka.serializers.subject.TopicRecordNameStrategy, the message value must only be a record schema

Is there any workaround can be applied or we need to implement custom serializer that will skip heartbeat messages?

Comment by Ross Lawley [ 29/Mar/21 ]

Hi vlad@guesty.com,

As described above SMT's apply to all messages created by a connector. The issue is the Heartbeat messages don't match the requirements for the SMTs. However, there is good news you can use a predicate to only apply the SMT's to the messages you want to.

        "transforms": "ValueToKey,extractKeyField",
        "transforms.ValueToKey.fields": "documentKey",
        "transforms.ValueToKey.predicate": "isPrefixTopic",
        "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.extractKeyField.field": "documentKey",  
        "transforms.extractKeyField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKeyField.predicate": "isPrefixTopic",  
        "predicates": "isPrefixTopic",
        "predicates.isPrefixTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
        "predicates.isPrefixTopic.pattern": "prefix.*" // Only apply to topics starting with prefix.

In the above example we match all topics starting with "prefix", you could easily switch that to use negate to discount all messages to the "heartbeats-mongodb" topic.

I'm closing this ticket as "works as designed" as SMT's are applied after the connector passes the messages to Kafka connect.

All the best,

Ross

Comment by Githook User [ 29/Mar/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Ensure the heartbeat message schema is optional

A non optional schema breaks value conversion.

KAFKA-207 KAFKA-208
Branch: master
https://github.com/mongodb/mongo-kafka/commit/00cfc5e4ee241159acf48fc90a1d6a623dfdfba0

Comment by Ross Lawley [ 24/Mar/21 ]

Hi vlad@guesty.com,

The SMT mechanism is a layer on top of the Source Connector API. So I imagine the SMT blanket applies to all SourceRecords produced by a Source Connector.

Ross

Comment by Vlad Goldman [ 24/Mar/21 ]

@ross.lawley but why SMT even applied to heartbeat records? Shouldn't it just skip them?

Comment by Ross Lawley [ 17/Mar/21 ]

Hi vlad@guesty.com,

Thanks for the ticket. The heartbeat mechanism provides differently shaped SourceRecords to the configured heartbeat topic. As such SMT's would need to be able to handle this scenario.

KAFKA-207 may fix this issue, as it changes the shape of the heartbeat records. I will test and report back.

Ross

Generated at Thu Feb 08 09:05:50 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.