Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-404

Support for multiple collections in a single streaming connection

    • Type: Icon: New Feature New Feature
    • Resolution: Done
    • Priority: Icon: Major - P3 Major - P3
    • 10.3.0
    • Affects Version/s: None
    • Component/s: Stream
    • Labels:
      None
    • Needed
    • Hide

      What this is and what needs to be changed in the docs

      This is a new feature, which also involves some backward-incompatible changes which we consider mild enough to release the feature in a minor release 10.3.0, instead of releasing it in a major release as per semantic versioning. We need to

      1. mention the feature in the Release notes, and explain there the breaking change accompanying it;
      2. [optional, at your discretion] update the wording at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/, https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read/
      3. [optional, at your discretion] add an example of reading from multiple collections to https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read/;
      4. update the description of the collection configuration option at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read-config/;
      5. fix the "Inferring the Schema of a Change Stream" note at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read/ and the description of the change.stream.publish.full.document.only configuration property at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read-config/: the connector infers a schema (based on the documents stored in the scanned collections) only if change.stream.publish.full.document.only is true, otherwise it requires a schema to be specified, which is not what the documentation currently says;
      6. update the "Inferring the Schema of a Change Stream" note at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read/ and the description of the change.stream.publish.full.document.only configuration property at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read-config/ with the information provided below about the new feature.

      Description

      Overview

      Currently, the MongoDB Spark Connector can scan (see org.apache.spark.sql.connector.read.Scan), a.k.a. read, a single collection specified via the collection configuration property. There are the following scan modes: batch query, micro-batch stream, continuous stream. SPARK-404 adds support for scanning multiple collections when using either the micro-batch or continuous stream mode.

      Enabling

      The new functionality is enabled via the collection configuration option:

      • To stream from a fixed set of collections, specify collections names separated with comma (','). For example, "collectionA,collectionB". Note how the values are separated only with comma, and there is no space (' ') accompanying it. Specifying a space makes it part of a collection name: "collectionA, collectionB"—collections "collectionA" and " collectionB".
      • To stream from all collections in the database (as specified by the database configuration option), specify a string consisting of a single asterisk ('*'). If a collection is created while scanning form all collections, it is automatically picked up for scanning.

      When scanning from multiple/all collections, collections can be freely dropped while being scanned.

      Backward-incompatibility

      • If you currently have a collection with comma (',') in its name, and you use it as the value of the collection configuration option, then the new version of the connector will treat such a value as multiple collections. To avoid this, you must escape the comma.
      • If you currently have a collection named "*", and you use it as the value of the collection configuration option, then the new version of the connector will treat such a value as signifying all collections. To avoid this, you must escape the asterisk.
      • If you currently have a collection with reverse solidus ('\') in its name, and you use it as the value of the collection configuration option, then the new version of the connector will either complain about it, or, if it is followed by either comma (','), or asterisk ("*"), or reverse solidus ('\'), the value will be treated differently from how the previous version treated it. To avoid this, you must escape the reverse solidus.

      Escaping

      If a collection name contains comma (','), reverse solidus ('\'), or starts with asterisk ('*'), such a character must be escaped with reverse solidus ('\'). Examples:

      • "mass\, kg"—a single collection named "mass, kg".
      • "\*"—a single collection named "*".
      • "\\"—a single collection named "\". Note that if the value is specified as a string literal in Java code, then each reverse solidus has to be further escaped, leading to having to specify "\\\\".

      Schema inference

      Schema inference happens at the beginning of scanning, and does not take into account collections that may be created while scanning, despite those collections being scanned after being created if scanning from all collections is configured. If you want to scan a fixed set of collections, specify their names explicitly instead of specifying "*".

      Performance considerations

      When scanning multiple collections, each collection is sampled sequentially. If the number of collections is large enough, such sequential sampling may take noticeable time.

      Show
      What this is and what needs to be changed in the docs This is a new feature, which also involves some backward-incompatible changes which we consider mild enough to release the feature in a minor release 10. 3 .0, instead of releasing it in a major release as per semantic versioning . We need to mention the feature in the Release notes , and explain there the breaking change accompanying it; [optional, at your discretion] update the wording at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/ , https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read/ [optional, at your discretion] add an example of reading from multiple collections to https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read/ ; update the description of the collection configuration option at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read-config/ ; fix the "Inferring the Schema of a Change Stream" note at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read/ and the description of the change.stream.publish.full.document.only configuration property at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read-config/: the connector infers a schema (based on the documents stored in the scanned collections) only if change.stream.publish.full.document.only is true , otherwise it requires a schema to be specified, which is not what the documentation currently says; update the "Inferring the Schema of a Change Stream" note at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read/ and the description of the change.stream.publish.full.document.only configuration property at https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-read-config/ with the information provided below about the new feature. Description Overview Currently, the MongoDB Spark Connector can scan (see org.apache.spark.sql.connector.read.Scan ), a.k.a. read, a single collection specified via the collection configuration property. There are the following scan modes: batch query , micro-batch stream , continuous stream . SPARK-404 adds support for scanning multiple collections when using either the micro-batch or continuous stream mode. Enabling The new functionality is enabled via the collection configuration option: To stream from a fixed set of collections, specify collections names separated with comma ( ',' ). For example, "collectionA,collectionB" . Note how the values are separated only with comma, and there is no space ( ' ' ) accompanying it. Specifying a space makes it part of a collection name: "collectionA, collectionB" —collections "collectionA" and " collectionB" . To stream from all collections in the database (as specified by the database configuration option), specify a string consisting of a single asterisk ( '*' ). If a collection is created while scanning form all collections, it is automatically picked up for scanning. When scanning from multiple/all collections, collections can be freely dropped while being scanned. Backward-incompatibility If you currently have a collection with comma ( ',' ) in its name, and you use it as the value of the collection configuration option, then the new version of the connector will treat such a value as multiple collections. To avoid this, you must escape the comma. If you currently have a collection named "*" , and you use it as the value of the collection configuration option, then the new version of the connector will treat such a value as signifying all collections. To avoid this, you must escape the asterisk. If you currently have a collection with reverse solidus ( '\' ) in its name, and you use it as the value of the collection configuration option, then the new version of the connector will either complain about it, or, if it is followed by either comma ( ',' ), or asterisk ( "*" ), or reverse solidus ( '\' ), the value will be treated differently from how the previous version treated it. To avoid this, you must escape the reverse solidus. Escaping If a collection name contains comma ( ',' ), reverse solidus ( '\' ), or starts with asterisk ( '*' ), such a character must be escaped with reverse solidus ( '\' ). Examples: "mass\, kg" —a single collection named "mass, kg" . "\*" —a single collection named "*" . "\\" —a single collection named "\" . Note that if the value is specified as a string literal in Java code, then each reverse solidus has to be further escaped, leading to having to specify "\\\\" . Schema inference Schema inference happens at the beginning of scanning, and does not take into account collections that may be created while scanning, despite those collections being scanned after being created if scanning from all collections is configured. If you want to scan a fixed set of collections, specify their names explicitly instead of specifying "*" . Performance considerations When scanning multiple collections, each collection is sampled sequentially. If the number of collections is large enough, such sequential sampling may take noticeable time.

      Implement the ability to specify multiple collections in the 'MongoDB Spark Connector' connection in Streaming mode.

      Currently the MongoDB Spark connector is limited to interacting with only one MongoDB collection during each read or write operation. As a result, it does not natively support reading or writing from multiple collections, simultaneously in a single operation. This requires creating a new Spark Connector connection for each collection, resulting in a bad developer experience, and in cases with large number of collections prevents the developer from being able to use the steaming mode outright.

      The workaround we are providing customer involves creating a loop that iterates over the list of collections they want to read from, and for each collection, use the MongoDB Spark Connector to read the data into Spark. This would require the customer to stand up a separate service to implement this logic and manage mechanisms like connection failures, timeouts, etc. 

      Given that the spark connector in streaming mode uses MongoDB change streams, it should be possible to connect to all the collections of a given database, in a single Spark connector connection. Further it the API can be extended to connect to multiple databases in the cluster, when setting up a Mongodb Spark connector connection in streaming mode, it would lead to a lot of benefit.

      ex customer request: https://www.mongodb.com/community/forums/t/pyspark-get-list-of-collections/225327
      asks "I would like to execute a query across multiple collections but avoid creating a new spark read session each time I do so."

            Assignee:
            valentin.kovalenko@mongodb.com Valentin Kavalenka
            Reporter:
            prakul.agarwal@mongodb.com Prakul Agarwal
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: