Details
-
Task
-
Resolution: Works as Designed
-
Major - P3
-
None
-
None
-
None
Description
hello, i encountered some problem when i using mongo-spark-connector_2.11.
If i access mongodb simply using MongoClient, everything is ok, the program print count of that collection
package tour; |
|
|
|
|
import com.mongodb.MongoClient; |
import com.mongodb.client.MongoCollection; |
import com.mongodb.client.MongoDatabase; |
import com.mongodb.MongoClientURI; |
|
|
public class MongoTest2 { |
private static final String dbName = "db_12"; |
private static final String colName = "coll_12"; |
|
|
public static void main( String args[] ){ |
try{ |
// 连接到 mongodb 服务 |
MongoClientURI uri = new MongoClientURI("mongodb://root:password@s-asdfaxe93f74.mongodb.rds.aliyuncs.com:3707,s-eqwtrdvcba0df74.mongodb.rds.aliyuncs.com:3707/admin"); |
MongoClient mongoClient = new MongoClient(uri); |
|
|
|
|
// 连接到数据库 |
MongoDatabase mongoDatabase = mongoClient.getDatabase(dbName);
|
System.out.println("Connect to database successfully"); |
String coll_name = colName;
|
MongoCollection coll = mongoDatabase.getCollection(coll_name);
|
long count = coll.count(); |
System.out.println("count of " + coll_name + " : " + count); |
|
|
}catch(Exception e){ |
System.err.println( e.getClass().getName() + ": " + e.getMessage() ); |
}
|
}
|
}
|
But when i use mongo-spark-connector_2.11 with yarn, The program is broken , raised exception
Caused by: com.mongodb.MongoCommandException: Command failed with error 18 (AuthenticationFailed): 'Authentication failed.' on server s-asdfaxe93f74.mongodb.rds.aliyuncs.com:3707. Th |
e full response is { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" } |
But the connection string is the same, i don't know what is wrong with these code. Does mongo-spark-connector have special requirements for the format of connection string?
the code is below
package tour; |
|
|
import com.mongodb.spark.MongoSpark; |
import com.mongodb.spark.config.ReadConfig; |
import com.mongodb.spark.rdd.api.java.JavaMongoRDD; |
import org.apache.log4j.Logger; |
import org.apache.spark.SparkConf; |
import org.apache.spark.api.java.JavaRDD; |
import org.apache.spark.api.java.JavaSparkContext; |
import org.bson.Document; |
|
|
import java.util.HashMap; |
import java.util.Map; |
|
|
public final class MongoTest { |
private static final Logger log = Logger.getLogger("myLogger"); |
private static final String dbName = "db_12"; |
private static final String colName = "coll_12"; |
|
|
|
|
/** |
* Run this main method to see the output of this quick example.
|
*
|
* @param args takes an optional single argument for the connection string
|
* @throws InterruptedException if a latch is interrupted
|
*/
|
public static void main(final String[] args) throws InterruptedException { |
long startTime = System.currentTimeMillis(); |
JavaSparkContext jsc = createJavaSparkContext(args);
|
|
|
Map<String, String> readOverrides = new HashMap<String, String>(); |
readOverrides.put("database", dbName); |
readOverrides.put("collection", colName); |
ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides);
|
|
|
|
|
// Loading and analyzing data from MongoDB |
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc, readConfig);
|
JavaRDD<Document> rdd2 = rdd.filter(s -> s.get("url").toString().contains("login") && s.get("url").toString().contains("username")); |
long count = rdd2.count(); |
System.out.println("##### " + count); |
log.info("rdd.count() == " + count); |
if (count > 0) { |
System.out.println("##### " + rdd2.first().toJson()); |
}
|
long endTime = System.currentTimeMillis(); |
log.info("total consume: " + (endTime - startTime) + "ms"); |
}
|
|
|
private static JavaSparkContext createJavaSparkContext(final String[] args) { |
String uri = getMongoClientURI(args);
|
log.info("mongodb cluster uri: " + uri); |
SparkConf conf = new SparkConf() |
.setAppName("MongoSparkConnectorTour") |
.set("spark.app.id", "MongoSparkConnectorTour") |
.set("spark.mongodb.input.uri", uri) |
.set("spark.mongodb.input.collection", String.format("%s.%s", dbName, colName)) |
.set("spark.mongodb.output.uri", uri); |
|
|
return new JavaSparkContext(conf); |
}
|
|
|
private static String getMongoClientURI(final String[] args) { |
String uri;
|
if (args.length == 0) { |
uri = String.format("mongodb://localhost/%s.%s", dbName, colName); // default |
} else { |
uri = args[0]; |
}
|
return uri; |
}
|
}
|
the stacktrace is below
2018-12-13 14:40:53,350 [main] ERROR com.mongodb.spark.rdd.MongoRDD - |
-----------------------------
|
WARNING: Partitioning failed.
|
-----------------------------
|
|
|
Partitioning using the 'DefaultMongoPartitioner$' failed. |
|
|
Please check the stacktrace to determine the cause of the failure or check the Partitioner API documentation.
|
Note: Not all partitioners are suitable for all toplogies and not all partitioners support views.%n |
|
|
-----------------------------
|
|
|
Exception in thread "main" com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='root', source='admin', password=<hidden>, mechani |
smProperties={}}
|
at com.mongodb.internal.connection.SaslAuthenticator.wrapException(SaslAuthenticator.java:173) |
at com.mongodb.internal.connection.SaslAuthenticator.access$300(SaslAuthenticator.java:40) |
at com.mongodb.internal.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:70) |
at com.mongodb.internal.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:47) |
at com.mongodb.internal.connection.SaslAuthenticator.doAsSubject(SaslAuthenticator.java:179) |
at com.mongodb.internal.connection.SaslAuthenticator.authenticate(SaslAuthenticator.java:47) |
at com.mongodb.internal.connection.InternalStreamConnectionInitializer.authenticateAll(InternalStreamConnectionInitializer.java:151) |
at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:64) |
at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:127) |
at com.mongodb.internal.connection.UsageTrackingInternalConnection.open(UsageTrackingInternalConnection.java:50) |
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.open(DefaultConnectionPool.java:390) |
at com.mongodb.internal.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:106) |
at com.mongodb.internal.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:92) |
at com.mongodb.internal.connection.DefaultServer.getConnection(DefaultServer.java:85) |
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.getConnection(ClusterBinding.java:114) |
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:211) |
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:205) |
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:115) |
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:108) |
at com.mongodb.operation.CommandReadOperation.execute(CommandReadOperation.java:55) |
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:179) |
at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:182) |
at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:151) |
at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:146) |
at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:234) |
at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:234) |
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) |
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) |
at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) |
at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) |
at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:234) |
at com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner.partitions(DefaultMongoPartitioner.scala:33) |
at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:135) |
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) |
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) |
at scala.Option.getOrElse(Option.scala:121) |
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) |
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) |
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) |
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) |
at scala.Option.getOrElse(Option.scala:121) |
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) |
at org.apache.spark.rdd.RDD.count(RDD.scala:1162) |
at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455) |
at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) |
at tour.MongoTest.main(MongoTest.java:41) |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
|
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) |
at java.lang.reflect.Method.invoke(Method.java:498) |
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) |
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:896) |
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) |
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) |
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) |
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
|
Caused by: com.mongodb.MongoCommandException: Command failed with error 18 (AuthenticationFailed): 'Authentication failed.' on server s-asdfaxe93f74.mongodb.rds.aliyuncs.com:3707. Th |
e full response is { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" } |
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179) |
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:293) |
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255) |
at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83) |
at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33) |
at com.mongodb.internal.connection.SaslAuthenticator.sendSaslContinue(SaslAuthenticator.java:134) |
at com.mongodb.internal.connection.SaslAuthenticator.access$200(SaslAuthenticator.java:40) |
at com.mongodb.internal.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:67) |
... 54 more |
2018-12-13 14:40:53,354 [Thread-1] INFO org.apache.spark.SparkContext - Invoking stop() from shutdown hook |
2018-12-13 14:40:53,357 [Thread-30] INFO com.mongodb.spark.connection.MongoClientCache - Closing MongoClient: [s-asdfaxe93f74.mongodb.rds.aliyuncs.com:3707,s-eqwtrdvcba0df74.mongodb.rds.aliyuncs.com:3707] |
2018-12-13 14:40:53,362 [Thread-1] INFO org.spark_project.jetty.server.AbstractConnector - Stopped Spark@128a2670{HTTP/1.1,[http/1.1]}{0.0.0.0:4041} |
2018-12-13 14:40:53,364 [Thread-1] INFO org.apache.spark.ui.SparkUI - Stopped Spark web UI at http://emr-header-1.cluster-70237:4051 |
2018-12-13 14:40:53,399 [Yarn application state monitor] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - Interrupting monitor thread |
2018-12-13 14:40:53,420 [Thread-1] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - Shutting down all executors |
2018-12-13 14:40:53,420 [dispatcher-event-loop-0] INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint - Asking each executor to shut down |
2018-12-13 14:40:53,423 [Thread-1] INFO org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Stopping SchedulerExtensionServices |
(serviceOption=None,
|
services=List(),
|
started=false) |
2018-12-13 14:40:53,424 [Thread-1] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - Stopped |
2018-12-13 14:40:53,427 [dispatcher-event-loop-0] INFO org.apache.spark.MapOutputTrackerMasterEndpoint - MapOutputTrackerMasterEndpoint stopped! |
2018-12-13 14:40:53,435 [Thread-1] INFO org.apache.spark.storage.memory.MemoryStore - MemoryStore cleared |
2018-12-13 14:40:53,435 [Thread-1] INFO org.apache.spark.storage.BlockManager - BlockManager stopped |
2018-12-13 14:40:53,441 [Thread-1] INFO org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster stopped |
2018-12-13 14:40:53,443 [dispatcher-event-loop-3] INFO org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint - OutputCommitCoordinator stopped! |
2018-12-13 14:40:53,445 [Thread-1] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext |