-
Type: Task
-
Resolution: Done
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: None
-
Labels:None
Hi Team,
I am trying to run a program using pyspark but getting a below error.
Please help me to resolve this issue
18/06/05 02:37:10 INFO storage.BlockManagerMaster: Registered BlockManager
18/06/05 02:37:12 INFO scheduler.EventLoggingListener: Logging events to hdfs://envueltos/user/spark/applicationHistory/local-1528180630550
18/06/05 02:37:12 INFO spark.SparkContext: Registered listener com.cloudera.spark.lineage.ClouderaNavigatorListener
Traceback (most recent call last):
File "/var/tmp/pgh/spark/movie-recommendations.py", line 68, in <module>
df = sqlContext.read.format("com.mongodb.spark.sql").options(uri=uri).load()
File "/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 139, in load
File "/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in _call_
File "/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o57.load.
: java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.sparkSession()Lorg/apache/spark/sql/SparkSession;
at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:66)
at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:748)
18/06/05 02:37:12 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
18/06/05 02:37:12 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
My code is:
/var/tmp/pgh/spark >cat submit-python.sh
#!/bin/bash
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd ${DIR}
spark-submit \
--master local[*] \
--jars /var/tmp/pgh/spark/mongo-java-driver-3.6.1.jar,/var/tmp/pgh/spark/mongo-spark-connector_2.11-2.2.1.jar \
${DIR}/movie-recommendations.py
/var/tmp/pgh/spark >cat movie-recommendations.py
#!/usr/bin/env python
import itertools
from math import sqrt
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
from pyspark.sql import SQLContext
def init_spark_context():
conf = SparkConf().setAppName("MovieRatings").set("spark.executor.memory", "4g")
sc = SparkContext(conf=conf)
sc.setCheckpointDir('/tmp/checkpoint/')
return sc
sc = SparkContext.getOrCreate(conf)
sc.setCheckpointDir("/tmp/checkpoint/")
def find_best_model(data):
global bestRank
global bestLambda
global bestNumIter
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
min_error = float('inf')
training, validation, test = data.randomSplit([0.6, 0.2, 0.2], 6)
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
ALS.checkpointInterval = 2
training_data = training.map(lambda xs: [int(x) for x in xs])
model = ALS.train(training_data, rank, numIter, lmbda)
validation_data = validation.map(lambda p: (int(p[0]), int(p[1])))
predictions = model.predictAll(validation_data).map(lambda r: ((r[0], r[1]), r[2]))
ratings_and_predictions = validation.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = sqrt(ratings_and_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'For rank %s the RMSE is %s' % (rank, error)
if error < min_error:
min_error = error
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
print 'The best model was trained with rank %s' % bestRank
def get_user_recommendations(personal_ratings, complete_data, model):
a = personal_ratings.map(lambda x: (x[1]))
b = complete_data_with_user_ratings.map(lambda x: (x[1]))
user_unrated_movies = b.subtract(a)
user_unrated_movies = user_unrated_movies.map(lambda x: (0, x)).distinct()
user_recommendations = new_ratings_model.predictAll(user_unrated_movies).map(lambda r: (r[0], r[1], int(r[2])))
return user_recommendations
def combine_data(personal_ratings, data):
personal_ratings = personal_ratings.map(lambda xs: [int(x) for x in xs])
data = data.map(lambda xs: [int(x) for x in xs])
d = data.union(personal_ratings)
return d
if _name_ == "_main_":
sc = init_spark_context()
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
sqlContext = SQLContext(sc)
uri = "mongodb://superuser:pwd@icl-mardb-vm01.nam.nsroot.net:37017/movies.movie_ratings?readPreference=primaryPreferred"
# Read movies collection and select only fields we care about
df = sqlContext.read.format("com.mongodb.spark.sql").options(uri=uri).load()
data = df.select('user_id', 'movie_id','rating').repartition(16).cache()
find_best_model(data)
# Next we get the personal ratings from the personal ratings_collection
df = sqlContext.read.format("com.mongodb.spark.sql").options(uri=uri, collection="personal_ratings").load()
personal_ratings = df.select('user_id', 'movie_id','rating').repartition(16).cache()
# Combine personal ratings with existing data
complete_data_with_user_ratings = combine_data(personal_ratings, data)
# Train new model
new_ratings_model = ALS.train(complete_data_with_user_ratings, bestRank, bestNumIter, bestLambda)
user_recommendations = get_user_recommendations(personal_ratings, complete_data_with_user_ratings, new_ratings_model)
# Make a DataFrame. Save to MongoDB
r = sqlContext.createDataFrame(user_recommendations, ['user_id', 'movie_id', 'rating'])
r.write.format("com.mongodb.spark.sql.DefaultSource").options(uri=uri, collection="user_recommendations").mode("overwrite").save()
#Clean up
sc.stop()