Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-349

Can't stream to csv from mongodb

    • Type: Icon: Bug Bug
    • Resolution: Works as Designed
    • Priority: Icon: Unknown Unknown
    • None
    • Affects Version/s: None
    • Component/s: None
    • Labels:
      None

      What did I use

      • Databricks Runtime Version 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)
        • org.mongodb.spark:mongo-spark-connector:10.0.1
      • MongoDB 5.0

       h3. What did I do
      I created a collection with the following code

      from pyspark.sql.types import *
      
      data = [
        ["student_1", 89],
        ["student_2", 77],
        ["student_3", 64],
      ] 
      
      schema = StructType([
        StructField("username", StringType()),
        StructField("score", StringType()),
      ])
      
      df = spark.createDataFrame(data, schema)
      
      (
        df
        .write
        .format("mongodb")
        .option("database", database)
        .option("collection", collection)
        .option("connection.uri", connection_uri)
        .mode("overwrite")
        .save()
      )
      

      Then I followed the instruction here to write stream the data from the collection to csv file.

      # define a streaming query
      query = (spark
        .readStream
        .format("mongodb")
        .option("spark.mongodb.connection.uri", connection_uri)
        .option('spark.mongodb.database', database)
        .option('spark.mongodb.collection', collection)
        .schema(schema)
        .load()
        # manipulate your streaming data
        .writeStream
        .format("csv")
        .option("path", "/output/")
        .option("checkpointLocation", "/checkpoint/")
        .trigger(continuous="1 second")
        .outputMode("append")
      )
      
      # run the query
      query.start()
      

      What do I expect

      The query starts successfully

      What did I get

      I got the following error

      java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)

      I tried disable the trigger from the write stream

        .writeStream
        .format("csv")
        .option("path", "/output/")
        .option("checkpointLocation", "/checkpoint/")
      #   .trigger(continuous="1 second")
        .outputMode("append")
      

      Then I got another error

      java.lang.UnsupportedOperationException: Data source mongodb does not support microbatch processing.

       

            Assignee:
            Unassigned Unassigned
            Reporter:
            me@kytse.com Kit Yam Tse
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: