Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-263

Spark Connector timeouts when connecting through SSL

    • Type: Icon: Task Task
    • Resolution: Done
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 2.4.1
    • Component/s: Configuration
    • Labels:
      None
    • Environment:
      Ubuntu 18.04, MongoDB 4.0.6, Spark 2.4.4, Scala 2.11.12, mongo-spark-connector 2.11-2.4.1

       

      Spark gets stuck for 30s until it timeouts when I try to connect to MongoDB using SSL (ssl=true). I have successfully imported server private key and CA certificate into Java Trust and Key Stores. I am using PySpark.

       

      This is my code for importing a collection into Spark:

       

      from pyspark import SparkContext
      from pyspark.sql import DataFrameReader, SQLContext
      
      sc = SparkContext('local', 'script')
      sqlContext = SQLContext(sc)
      
      mongo_url = 'mongodb://<user>:<pass>@<host>:<port>/<database>.<collection>?replicaSet=replica-set-name&ssl=true&authSource=admin&readPreference=nearest&authMechanism=SCRAM-SHA-1
      mongo_df = sqlContext.read.format('com.mongodb.spark.sql.DefaultSource').option('uri', mongo_url).load()
      

      This is the command I use to run pyspark:

       

      spark-submit \
      --driver-java-options -Djavax.net.ssl.trustStore=/path/to/truststore.ks \
      --driver-java-options -Djavax.net.ssl.trustStorePassword=tspassword \
      --driver-java-options -Djavax.net.ssl.keyStore=/path/to/keystore.ks \
      --driver-java-options -Djavax.net.ssl.keyStorePassword=kspassword \
      --conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStore=/path/to/truststore.ks \
      --conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStorePassword=tspassword \
      --conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStore=/path/to/keystore.ks \
      --conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStorePassword=kspassword \
      --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 \
      script.py
      

      Finally, this is the output I am getting:

      19/11/18 14:22:40 INFO SparkContext: Created broadcast 0 from broadcast at MongoSpark.scala:542
      19/11/18 14:22:40 INFO cluster: Cluster created with settings {hosts=[<host>:<port>], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500, requiredReplicaSetName='replica-set-name'}
      19/11/18 14:22:40 INFO cluster: Adding discovered server mongodb-data-1.haip.io:27017 to client view of cluster
      19/11/18 14:22:40 INFO MongoClientCache: Creating MongoClient: [<host>:<port>]
      19/11/18 14:22:40 INFO cluster: No server chosen by com.mongodb.client.internal.MongoClientDelegate$1@5e46f5b5 from cluster description ClusterDescription{type=REPLICA_SET, connectionMode=MULTIPLE, serverDescriptions=[ServerDescription{address=<host>:<port>, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
      19/11/18 14:22:41 INFO cluster: Exception in monitor thread while connecting to server <host>:<port>
      com.mongodb.MongoSocketReadException: Prematurely reached end of stream
      	at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112)
      	at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580)
      	at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445)
      	at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)
      	at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259)
      	at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83)
      	at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33)
      	at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initializeConnectionDescription(InternalStreamConnectionInitializer.java:105)
      	at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:62)
      	at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:129)
      	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:117)
      	at java.lang.Thread.run(Thread.java:748)
      
      Traceback (most recent call last):
        File "script.py", line 16, in <module>
          mongo_df = sqlContext.read.format('com.mongodb.spark.sql.DefaultSource').option('uri', mongo_url).load()
        File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
        File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
        File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
        File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
      py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
      : com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@5e46f5b5. Client view of cluster state is {type=REPLICA_SET, servers=[{address=<host>:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
      	at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:408)
      	at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:123)
      	at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:54)
      	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:147)
      	at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:100)
      	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:277)
      	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:181)
      	at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:186)
      	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:155)
      	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:150)
      	at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
      	at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
      	at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
      	at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
      	at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:157)
      	at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:174)
      	at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:237)
      	at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:221)
      	at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:221)
      	at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
      	at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97)
      	at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
      	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
      	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
      	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
      	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      	at py4j.Gateway.invoke(Gateway.java:282)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:238)
      	at java.lang.Thread.run(Thread.java:748)
      
      
      19/11/18 14:23:10 INFO SparkContext: Invoking stop() from shutdown hook
      19/11/18 14:23:10 INFO MongoClientCache: Closing MongoClient: [<host>:<port>]
      

       

       

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            flizanab@gmail.com Fernando Lizana
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: