Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-185

mongodb-spark connector

    • Type: Icon: Task Task
    • Resolution: Done
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • Labels:
      None

      Hi Team,

      I am trying to run a program using pyspark but getting a below error. 

      Please help me to resolve this issue

      18/06/05 02:37:10 INFO storage.BlockManagerMaster: Registered BlockManager
      18/06/05 02:37:12 INFO scheduler.EventLoggingListener: Logging events to hdfs://envueltos/user/spark/applicationHistory/local-1528180630550
      18/06/05 02:37:12 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.ClouderaNavigatorListener
      Traceback (most recent call last):
        File "/var/tmp/pgh/spark/movie-recommendations.py", line 68, in <module>
          df = sqlContext.read.format("com.mongodb.spark.sql").options(uri=uri).load()
        File "/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 139, in load
        File "/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in _call_
        File "/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
        File "/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
      py4j.protocol.Py4JJavaError: An error occurred while calling o57.load.
      : java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.sparkSession()Lorg/apache/spark/sql/SparkSession;
              at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:66)
              at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
              at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
              at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
              at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
              at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
              at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
              at py4j.Gateway.invoke(Gateway.java:259)
              at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
              at py4j.commands.CallCommand.execute(CallCommand.java:79)
              at py4j.GatewayConnection.run(GatewayConnection.java:209)
              at java.lang.Thread.run(Thread.java:748)

      18/06/05 02:37:12 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
      18/06/05 02:37:12 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

       

      My code is:

      /var/tmp/pgh/spark >cat submit-python.sh
      #!/bin/bash

      DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
      cd ${DIR}
      spark-submit \
      --master local[*] \
      --jars /var/tmp/pgh/spark/mongo-java-driver-3.6.1.jar,/var/tmp/pgh/spark/mongo-spark-connector_2.11-2.2.1.jar \
      ${DIR}/movie-recommendations.py

       

      /var/tmp/pgh/spark >cat movie-recommendations.py

      #!/usr/bin/env python

      import itertools
      from math import sqrt

      from pyspark import SparkConf, SparkContext
      from pyspark.mllib.recommendation import ALS
      from pyspark.sql import SQLContext

      def init_spark_context():
          conf = SparkConf().setAppName("MovieRatings").set("spark.executor.memory", "4g")
          sc = SparkContext(conf=conf)
          sc.setCheckpointDir('/tmp/checkpoint/')
          return sc
       
       sc = SparkContext.getOrCreate(conf)
          sc.setCheckpointDir("/tmp/checkpoint/")

      def find_best_model(data):
          global bestRank
          global bestLambda
          global bestNumIter
          bestRank = 0
          bestLambda = -1.0
          bestNumIter = -1
          ranks = [8, 12]
          lambdas = [0.1, 10.0]
          numIters = [10, 20]
          min_error = float('inf')
          training, validation, test = data.randomSplit([0.6, 0.2, 0.2], 6)
          for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
              ALS.checkpointInterval = 2
              training_data = training.map(lambda xs: [int(x) for x in xs])
              model = ALS.train(training_data, rank, numIter, lmbda)
              validation_data = validation.map(lambda p: (int(p[0]), int(p[1])))
              predictions = model.predictAll(validation_data).map(lambda r: ((r[0], r[1]), r[2]))
              ratings_and_predictions = validation.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
              error = sqrt(ratings_and_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean())
              print 'For rank %s the RMSE is %s' % (rank, error)
              if error < min_error:
                  min_error = error
                  bestRank = rank
                  bestLambda = lmbda
                  bestNumIter = numIter
          print 'The best model was trained with rank %s' % bestRank

      def get_user_recommendations(personal_ratings, complete_data, model):
          a = personal_ratings.map(lambda x: (x[1]))
          b = complete_data_with_user_ratings.map(lambda x: (x[1]))
          user_unrated_movies = b.subtract(a)
          user_unrated_movies = user_unrated_movies.map(lambda x: (0, x)).distinct()
          user_recommendations = new_ratings_model.predictAll(user_unrated_movies).map(lambda r: (r[0], r[1], int(r[2])))
          return user_recommendations

      def combine_data(personal_ratings, data):
          personal_ratings = personal_ratings.map(lambda xs: [int(x) for x in xs])
          data = data.map(lambda xs: [int(x) for x in xs])
          d = data.union(personal_ratings)
          return d

      if _name_ == "_main_":

          sc = init_spark_context()
          logger = sc._jvm.org.apache.log4j
          logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)

          sqlContext = SQLContext(sc)
          uri = "mongodb://superuser:pwd@icl-mardb-vm01.nam.nsroot.net:37017/movies.movie_ratings?readPreference=primaryPreferred"

          # Read movies collection and select only fields we care about
          df = sqlContext.read.format("com.mongodb.spark.sql").options(uri=uri).load()
          data = df.select('user_id', 'movie_id','rating').repartition(16).cache()

          find_best_model(data)

          # Next we get the personal ratings from the personal ratings_collection
          df = sqlContext.read.format("com.mongodb.spark.sql").options(uri=uri, collection="personal_ratings").load()
          personal_ratings = df.select('user_id', 'movie_id','rating').repartition(16).cache()

          # Combine personal ratings with existing data
          complete_data_with_user_ratings = combine_data(personal_ratings, data)

          # Train new model
          new_ratings_model = ALS.train(complete_data_with_user_ratings, bestRank, bestNumIter, bestLambda)
          user_recommendations = get_user_recommendations(personal_ratings, complete_data_with_user_ratings, new_ratings_model)

          # Make a DataFrame. Save to MongoDB
          r = sqlContext.createDataFrame(user_recommendations, ['user_id', 'movie_id', 'rating'])
          r.write.format("com.mongodb.spark.sql.DefaultSource").options(uri=uri, collection="user_recommendations").mode("overwrite").save()

          #Clean up
          sc.stop()

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            preeti3.gupta@citi.com Preeti Gupta
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: