|
In the class-[org|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org].[apache|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache].[kafka|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka].[connect|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka.connect].[runtime|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka.connect.runtime].[WorkerSourceTask|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka.connect.runtime%7BWorkerSourceTask.java%E2%98%83WorkerSourceTask]
enter function sendRecords()
we know that:
firstly, execute
-
// Offsets are converted & serialized in the OffsetWriter
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
-
then execute
producer.send(...)
but if producer.send(...) fail, but the offset of the record has been submit to offsetWriter
and we know that,
[org|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org].[apache|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache].[kafka|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka].[connect|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka.connect].[runtime|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka.connect.runtime].[SourceTaskOffsetCommitter|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka.connect.runtime%7BSourceTaskOffsetCommitter.java%E2%98%83SourceTaskOffsetCommitter].schedule([ConnectorTaskId|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka.connect.runtime%7BSourceTaskOffsetCommitter.java%E2%98%83SourceTaskOffsetCommitter~schedule~QConnectorTaskId;~QWorkerSourceTask;%E2%98%82ConnectorTaskId] id, [WorkerSourceTask|eclipse-javadoc:%E2%98%82=kafka-src-3.0.0/src%3Cconnect.runtime.src.main.java.org.apache.kafka.connect.runtime%7BSourceTaskOffsetCommitter.java%E2%98%83SourceTaskOffsetCommitter~schedule~QConnectorTaskId;~QWorkerSourceTask;%E2%98%82WorkerSourceTask] workerTask) is invoked periodically,
then the fail record has been recrod to the default file : /tmp/connect.offsets
so it is wrong, Is this a bug?
|