-
Type: New Feature
-
Resolution: Duplicate
-
Priority: Unknown
-
Affects Version/s: None
-
Component/s: None
-
None
-
Java Drivers
-
Not Needed
Hi,
We are using the MongoDB Kafka Connect Connector in source mode and having an issue with it.
The connectors we create fully consume the MongoDB collection when no offsets (topic “connect-offsets”) exist for this connector.
To do this, we use the following setting:
"startup.mode": "copy_existing"
The problem we encounter is, if we restart the connector or the kafka connect service, the connector will completely consume the MongoDB collection a second time.
The connector does this because the only message that exists in the topic is a message indicating the copy and not a data uuid.
{ "_id": "{\"_id\": {\"$oid\": \"661555dda034612d38b79822\"}, \"copyingData\": true}", "copy": "true" }
Looking at the code of the “shouldCopyData” method, this is the default behavior.
/** * Checks to see if data should be copied. * * <p>Copying data is only required if it's been configured and it hasn't already completed. * * @return true if should copy the existing data. */ private static boolean shouldCopyData( final SourceTaskContext context, final MongoSourceConfig sourceConfig) { Map<String, Object> offset = getOffset(context, sourceConfig); return sourceConfig.getStartupConfig().startupMode() == StartupMode.COPY_EXISTING && (offset == null || offset.containsKey(COPY_KEY)); }
It is true that for us, the copy operation should be done if the offset does not exist or the offset does not contain the “COPY_KEY” key, which would avoid a second consumption of the MongoDB collection.
Is this a bug?
Should the condition for the presence of the “copy” key be reversed? A copy is not triggered if the offset contains the “copy” key.
return sourceConfig.getStartupConfig().startupMode() == StartupMode.COPY_EXISTING && (offset == null || !offset.containsKey(COPY_KEY));
If this is truly the desired behavior what is the reason?
And would it be possible to disable this behavior using an option?
Example
startup.mode.copy.disable.if.copy.offset.exist = true
The objective is to deactivate copying if a copy offset already exists.
- is fixed by
-
KAFKA-428 Avoid unnecessary copy of data on restart
- Closed