Index: src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorIntegrationTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorIntegrationTest.java (revision 0e60679d17cd6d147e319c5be6b7692f24c8f937) +++ src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorIntegrationTest.java (date 1600806750209) @@ -255,6 +255,7 @@ MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName()); sourceProperties.put(MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000"); sourceProperties.put(MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, heartbeatTopic); + sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true"); sourceProperties.put("offset.flush.interval.ms", "1000"); addSourceConnector(sourceProperties); Index: src/main/java/com/mongodb/kafka/connect/source/MongoCopyDataManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/main/java/com/mongodb/kafka/connect/source/MongoCopyDataManager.java (revision 0e60679d17cd6d147e319c5be6b7692f24c8f937) +++ src/main/java/com/mongodb/kafka/connect/source/MongoCopyDataManager.java (date 1600806470756) @@ -123,6 +123,7 @@ private void copyDataFrom(final MongoNamespace namespace) { LOGGER.debug("Copying existing data from: {}", namespace.getFullName()); try { + Thread.sleep(20_000); mongoClient .getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName(), RawBsonDocument.class)