Details
-
Task
-
Resolution: Cannot Reproduce
-
Major - P3
-
None
-
0.1
-
None
-
None
-
CentOS Linux release 7.5.1804
Description
Hi team,
I setup a environment for streaming data from Kafka to MongoDB.
My kafka contains more than 4 million messages.
I start the Connector at first time. 178337 records are inserted into Mongo successfully.
But, then the Connector is killed. The console display (I tried to set log level is TRACE in kafka/config/log4j.properties)
[2019-07-10 14:02:20,285] INFO WorkerSinkTask{id=mongo-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301) |
[2019-07-10 14:02:20,301] INFO Cluster ID: h0zp0aY6RG2dg59xJWCLFA (org.apache.kafka.clients.Metadata:365) |
[2019-07-10 14:02:20,302] INFO [Consumer clientId=consumer-1, groupId=connect-mongo-sink] Discovered group coordinator 192.168.1.45:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675) |
[2019-07-10 14:02:20,304] INFO [Consumer clientId=consumer-1, groupId=connect-mongo-sink] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459) |
[2019-07-10 14:02:20,305] INFO [Consumer clientId=consumer-1, groupId=connect-mongo-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491) |
[2019-07-10 14:02:20,314] INFO [Consumer clientId=consumer-1, groupId=connect-mongo-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491) |
[2019-07-10 14:02:20,326] INFO [Consumer clientId=consumer-1, groupId=connect-mongo-sink] Successfully joined group with generation 7 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455) |
[2019-07-10 14:02:20,332] INFO [Consumer clientId=consumer-1, groupId=connect-mongo-sink] Setting newly assigned partitions: CDCDataStream-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290) |
[2019-07-10 14:02:20,625] INFO Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71) |
[2019-07-10 14:02:20,668] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71) |
[2019-07-10 14:02:20,770] INFO Opened connection [connectionId{localValue:1, serverValue:8300}] to localhost:27017 (org.mongodb.driver.connection:71) |
[2019-07-10 14:02:20,774] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 4, 16]}, minWireVersion=0, maxWireVersion=5, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTimeNanos=1474521} (org.mongodb.driver.cluster:71) |
[2019-07-10 14:02:20,786] INFO Opened connection [connectionId{localValue:2, serverValue:8301}] to localhost:27017 (org.mongodb.driver.connection:71) |
Killed
|
The configuration file MongoSinkConnector.properties
name=mongo-sink
|
topics=test
|
connector.class=com.mongodb.kafka.connect.MongoSinkConnector |
tasks.max=1 |
key.ignore=true |
|
|
# Specific global MongoDB Sink Connector configuration
|
connection.uri=mongodb://localhost:27017 |
database=test_kafka
|
collection=transaction
|
max.num.retries=3 |
retries.defer.timeout=5000 |
type.name=kafka-connect
|
key.converter=org.apache.kafka.connect.json.JsonConverter
|
key.converter.schemas.enable=false |
value.converter=org.apache.kafka.connect.json.JsonConverter
|
value.converter.schemas.enable=false |
Expectation: All messages from Kafka are inserted into Mongo successfully.