Details
-
Improvement
-
Resolution: Fixed
-
Major - P3
-
None
-
None
-
None
Description
When accidentally setting an incorrect pipeline its hard to determine the cause of the error. Especially when reconfiguring a connector:
The following logs do not make it clear that the pipeline was incorrect:
(com.mongodb.kafka.connect.source.MongoSourceTask)
|
[2020-11-19 12:09:45,477] INFO WorkerSourceTask{id=mongo-connector-test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
|
[2020-11-19 12:09:45,478] INFO WorkerSourceTask{id=mongo-connector-test-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
|
[2020-11-19 12:09:46,914] INFO Watching for collection changes on 'test.products' (com.mongodb.kafka.connect.source.MongoSourceTask)
|
[2020-11-19 12:09:46,916] INFO Resuming the change stream after the previous offset: {"_data": "825FB65F61000000012B022C0100296E5A1004718E568607F245848B25C25D9FD3640E46645F696400645FB65F61926F0CD231104D930004"} (com.mongodb.kafka.connect.source.MongoSourceTask)
|
[2020-11-19 12:09:46,917] WARN Failed to resume change stream: A pipeline stage specification object must contain exactly one field. 40323
|
|
=====================================================================================
|
If the resume token is no longer available then there is the potential for data loss.
|
Saved resume tokens are managed by Kafka and stored with the offset data.
|
|
To restart the change stream with no resume token either:
|
* Create a new partition name using the `offset.partition.name` configuration.
|
* Set `errors.tolerance=all` and ignore the erroring resume token.
|
* Manually remove the old offset from its configured storage.
|
|
Resetting the offset will allow for the connector to be resume from the latest resume
|
token. Using `copy.existing=true` ensures that all data will be outputted by the
|
connector but it will duplicate existing data.
|
=====================================================================================
|
(com.mongodb.kafka.connect.source.MongoSourceTask)
|
Example pipeline:
[\{ \"$addFields\"\{ \"myField\": \"$fullDocument.myField\" \}, \"$project\":\{ \"fullDocument\": 0 \} }]