-
Type: Task
-
Resolution: Works as Designed
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: Configuration
-
None
hello, i encountered some problem when i using mongo-spark-connector_2.11.
If i access mongodb simply using MongoClient, everything is ok, the program print count of that collection
package tour; import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.MongoClientURI; public class MongoTest2 { private static final String dbName = "db_12"; private static final String colName = "coll_12"; public static void main( String args[] ){ try{ // 连接到 mongodb 服务 MongoClientURI uri = new MongoClientURI("mongodb://root:password@s-asdfaxe93f74.mongodb.rds.aliyuncs.com:3707,s-eqwtrdvcba0df74.mongodb.rds.aliyuncs.com:3707/admin"); MongoClient mongoClient = new MongoClient(uri); // 连接到数据库 MongoDatabase mongoDatabase = mongoClient.getDatabase(dbName); System.out.println("Connect to database successfully"); String coll_name = colName; MongoCollection coll = mongoDatabase.getCollection(coll_name); long count = coll.count(); System.out.println("count of " + coll_name + " : " + count); }catch(Exception e){ System.err.println( e.getClass().getName() + ": " + e.getMessage() ); } } }
But when i use mongo-spark-connector_2.11 with yarn, The program is broken , raised exception
Caused by: com.mongodb.MongoCommandException: Command failed with error 18 (AuthenticationFailed): 'Authentication failed.' on server s-asdfaxe93f74.mongodb.rds.aliyuncs.com:3707. Th e full response is { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" }
But the connection string is the same, i don't know what is wrong with these code. Does mongo-spark-connector have special requirements for the format of connection string?
the code is below
package tour; import com.mongodb.spark.MongoSpark; import com.mongodb.spark.config.ReadConfig; import com.mongodb.spark.rdd.api.java.JavaMongoRDD; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.bson.Document; import java.util.HashMap; import java.util.Map; public final class MongoTest { private static final Logger log = Logger.getLogger("myLogger"); private static final String dbName = "db_12"; private static final String colName = "coll_12"; /** * Run this main method to see the output of this quick example. * * @param args takes an optional single argument for the connection string * @throws InterruptedException if a latch is interrupted */ public static void main(final String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); JavaSparkContext jsc = createJavaSparkContext(args); Map<String, String> readOverrides = new HashMap<String, String>(); readOverrides.put("database", dbName); readOverrides.put("collection", colName); ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides); // Loading and analyzing data from MongoDB JavaMongoRDD<Document> rdd = MongoSpark.load(jsc, readConfig); JavaRDD<Document> rdd2 = rdd.filter(s -> s.get("url").toString().contains("login") && s.get("url").toString().contains("username")); long count = rdd2.count(); System.out.println("##### " + count); log.info("rdd.count() == " + count); if (count > 0) { System.out.println("##### " + rdd2.first().toJson()); } long endTime = System.currentTimeMillis(); log.info("total consume: " + (endTime - startTime) + "ms"); } private static JavaSparkContext createJavaSparkContext(final String[] args) { String uri = getMongoClientURI(args); log.info("mongodb cluster uri: " + uri); SparkConf conf = new SparkConf() .setAppName("MongoSparkConnectorTour") .set("spark.app.id", "MongoSparkConnectorTour") .set("spark.mongodb.input.uri", uri) .set("spark.mongodb.input.collection", String.format("%s.%s", dbName, colName)) .set("spark.mongodb.output.uri", uri); return new JavaSparkContext(conf); } private static String getMongoClientURI(final String[] args) { String uri; if (args.length == 0) { uri = String.format("mongodb://localhost/%s.%s", dbName, colName); // default } else { uri = args[0]; } return uri; } }
the stacktrace is below
2018-12-13 14:40:53,350 [main] ERROR com.mongodb.spark.rdd.MongoRDD - ----------------------------- WARNING: Partitioning failed. ----------------------------- Partitioning using the 'DefaultMongoPartitioner$' failed. Please check the stacktrace to determine the cause of the failure or check the Partitioner API documentation. Note: Not all partitioners are suitable for all toplogies and not all partitioners support views.%n ----------------------------- Exception in thread "main" com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='root', source='admin', password=<hidden>, mechani smProperties={}} at com.mongodb.internal.connection.SaslAuthenticator.wrapException(SaslAuthenticator.java:173) at com.mongodb.internal.connection.SaslAuthenticator.access$300(SaslAuthenticator.java:40) at com.mongodb.internal.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:70) at com.mongodb.internal.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:47) at com.mongodb.internal.connection.SaslAuthenticator.doAsSubject(SaslAuthenticator.java:179) at com.mongodb.internal.connection.SaslAuthenticator.authenticate(SaslAuthenticator.java:47) at com.mongodb.internal.connection.InternalStreamConnectionInitializer.authenticateAll(InternalStreamConnectionInitializer.java:151) at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:64) at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:127) at com.mongodb.internal.connection.UsageTrackingInternalConnection.open(UsageTrackingInternalConnection.java:50) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.open(DefaultConnectionPool.java:390) at com.mongodb.internal.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:106) at com.mongodb.internal.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:92) at com.mongodb.internal.connection.DefaultServer.getConnection(DefaultServer.java:85) at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.getConnection(ClusterBinding.java:114) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:211) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:205) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:115) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:108) at com.mongodb.operation.CommandReadOperation.execute(CommandReadOperation.java:55) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:179) at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:182) at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:151) at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:146) at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:234) at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:234) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:234) at com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner.partitions(DefaultMongoPartitioner.scala:33) at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:135) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.rdd.RDD.count(RDD.scala:1162) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) at tour.MongoTest.main(MongoTest.java:41) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:896) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: com.mongodb.MongoCommandException: Command failed with error 18 (AuthenticationFailed): 'Authentication failed.' on server s-asdfaxe93f74.mongodb.rds.aliyuncs.com:3707. Th e full response is { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" } at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179) at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:293) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255) at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83) at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33) at com.mongodb.internal.connection.SaslAuthenticator.sendSaslContinue(SaslAuthenticator.java:134) at com.mongodb.internal.connection.SaslAuthenticator.access$200(SaslAuthenticator.java:40) at com.mongodb.internal.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:67) ... 54 more 2018-12-13 14:40:53,354 [Thread-1] INFO org.apache.spark.SparkContext - Invoking stop() from shutdown hook 2018-12-13 14:40:53,357 [Thread-30] INFO com.mongodb.spark.connection.MongoClientCache - Closing MongoClient: [s-asdfaxe93f74.mongodb.rds.aliyuncs.com:3707,s-eqwtrdvcba0df74.mongodb.rds.aliyuncs.com:3707] 2018-12-13 14:40:53,362 [Thread-1] INFO org.spark_project.jetty.server.AbstractConnector - Stopped Spark@128a2670{HTTP/1.1,[http/1.1]}{0.0.0.0:4041} 2018-12-13 14:40:53,364 [Thread-1] INFO org.apache.spark.ui.SparkUI - Stopped Spark web UI at http://emr-header-1.cluster-70237:4051 2018-12-13 14:40:53,399 [Yarn application state monitor] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - Interrupting monitor thread 2018-12-13 14:40:53,420 [Thread-1] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - Shutting down all executors 2018-12-13 14:40:53,420 [dispatcher-event-loop-0] INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint - Asking each executor to shut down 2018-12-13 14:40:53,423 [Thread-1] INFO org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Stopping SchedulerExtensionServices (serviceOption=None, services=List(), started=false) 2018-12-13 14:40:53,424 [Thread-1] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - Stopped 2018-12-13 14:40:53,427 [dispatcher-event-loop-0] INFO org.apache.spark.MapOutputTrackerMasterEndpoint - MapOutputTrackerMasterEndpoint stopped! 2018-12-13 14:40:53,435 [Thread-1] INFO org.apache.spark.storage.memory.MemoryStore - MemoryStore cleared 2018-12-13 14:40:53,435 [Thread-1] INFO org.apache.spark.storage.BlockManager - BlockManager stopped 2018-12-13 14:40:53,441 [Thread-1] INFO org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster stopped 2018-12-13 14:40:53,443 [dispatcher-event-loop-3] INFO org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint - OutputCommitCoordinator stopped! 2018-12-13 14:40:53,445 [Thread-1] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext