Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-296

Querying Sharded MongoDB through Spark-MongoDB connector

    • Type: Icon: Task Task
    • Resolution: Gone away
    • Priority: Icon: Critical - P2 Critical - P2
    • None
    • Affects Version/s: None
    • Component/s: Partitioners
    • Labels:

      We are trying to establish a connection with mongoDB from Spark Connector, the total size of collection is around 19000 GB and it is sharded cluster. I am trying to query only 2 mins data which would be around 1 MB max as I implemented predicate pushdown with pipeline clauses at the time of reading of data frame.  but I am getting the error, below is the code snippet.

      /bin/pyspark --conf "spark.mongodb.input.uri=xxxx\
      --conf "spark.mongodb.output.uri=xxxxx \
      --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1

      from_date_iso = "2021-07-05T05:10:00Z"
      to_date_iso = "2021-07-05T05:12:00Z" 

      pipeLine = [{'$project': {"_id": 1}},
      {'$match': {"createDate": {'$gte':

      {'$date': from_date_iso}

      , '$lt': {'$date': to_date_iso}},'$comment' : "AnkitPandey"}}]

      df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
      .option("database", "collection1) \
      .option("collection", "collection1") \
      .option("readPreference.name", "secondaryPreferred") \
      .option("pipeline",pipeLine) \
      .load()

      below is the error.

      py4j.protocol.Py4JJavaError: An error occurred while calling o78.count.
      : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
      Exchange SinglePartition, ENSURE_REQUIREMENTS, id=#18
      +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=count#22L)
        +- *(1) Scan MongoRelation(MongoRDD[0] at RDD at MongoRDD.scala:51,Some(StructType(StructField(_id,LongType,true), StructField(createDate,TimestampType,true), StructField(dataCenter,StringType,true), StructField(dateTaken,LongType,true), StructField(files,ArrayType(StructType(StructField(fileType,StringType,true), StructField(url,StringType,true), StructField(width,IntegerType,true), StructField(height,IntegerType,true), StructField(size,LongType,true)),true),true), StructField(ownerAccountId,LongType,true)))) [] PushedFilters: [], ReadSchema: struct<>

      Need help to rectify this issue. thanks in advance.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            ankit_pandey02@outlook.com Ankit Pandey
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: