Improve Spark SQL support

XMLWordPrintableJSON

    • Type: Improvement
    • Resolution: Unresolved
    • Priority: Unknown
    • None
    • Affects Version/s: 10.7.0, 11.1.0
    • Component/s: None
    • None
    • Java Drivers
    • None
    • None
    • None
    • None
    • None
    • None

      Summary

      The MongoCatalog provides partial Spark SQL support. This ticket covers (1) an audit and closure of the gaps in the catalog's DDL/DML support, and (2) a correctness fix to ensure DELETE never silently ignores parts of a WHERE clause.

      Background

      Customers using Spark SQL against the connector currently hit the following limitations:

      • CREATE TABLE ... USING mongodb always fails. Spark SQL automatically attaches table properties (e.g. provider, owner) to every CREATE TABLE statement, and MongoCatalog.createTable throws UnsupportedOperationException for any non-empty properties map. This also breaks CREATE TABLE AS SELECT. (Related: SPARK-309 — support create collection options.)
      • INSERT INTO requires an explicit column list. The table declares ACCEPT_ANY_SCHEMA, so Spark takes field names directly from the query; positional inserts write synthetic column names (col1, col2, ...) rather than failing cleanly or mapping to the collection's fields.
      • UPDATE and MERGE INTO are not supported (SupportsRowLevelOperations is not implemented).
      • ALTER TABLE and ALTER NAMESPACE always throw.

      Correctness issue: DELETE can ignore filters

      MongoTable.deleteWhere converts Spark filters to MongoDB query predicates, but any filter that cannot be converted is silently dropped before the deleteMany is executed. Because SupportsDelete.canDeleteWhere is not overridden, Spark believes all filters were applied.

      Consequence: a statement such as

      DELETE FROM mongo.db.people WHERE name = 'Amy' AND <unsupported expression>;
      

      executes as DELETE ... WHERE name = 'Amy' and deletes more documents than the user requested. This is a potential data-loss bug and should be fixed independently of the broader catalog gaps.

      Proposed work

      1. DELETE filter safety (highest priority)

      • Override SupportsDelete.canDeleteWhere to return false unless every filter can be converted to a MongoDB predicate. Spark will then reject the statement at planning time instead of executing a broader delete.
      • As defence in depth, make deleteWhere throw if any filter fails to convert, rather than dropping it.
      • Add integration tests covering: fully supported predicates, partially supported predicates (must fail, not over-delete), and unsupported predicates.

      2. CREATE TABLE support

      • Accept the properties Spark always supplies (provider, owner, location, comment), ignoring those that have no MongoDB equivalent.
      • Map TableCatalog.OPTION_PREFIX-prefixed properties to MongoDB createCollection options (completes SPARK-309).
      • Verify CREATE TABLE AS SELECT works once plain CREATE TABLE is fixed.

      3. Use the declared schema for INSERT (positional inserts)

      Goal: after a CREATE TABLE with a declared schema, a plain INSERT INTO t VALUES (...) should map values to the declared column names.

      The blocker in both phases below is the ACCEPT_ANY_SCHEMA table capability. MongoTable always declares it, which tells Spark to skip resolving a write's columns against the table schema — so positional VALUES keep synthetic names (col1, col2, ...). When the table has a declared schema, the capability should be omitted so Spark positionally maps and casts VALUES to the declared columns, and fails with a clear analysis error on column-count mismatch. It should be kept for inferred-schema tables so existing DataFrame append behaviour is unchanged.

      Phase 1 — Spark metastore (session catalog) tables

      Schemas declared via the session catalog are already persisted by Spark and passed back to the connector on every statement (MongoTableProvider.supportsExternalMetadata returns true):

      CREATE TABLE people (name STRING, address STRING, zip STRING)
      USING mongodb
      OPTIONS (database 'db', collection 'people', 'connection.uri' '...');
      

      So for this path no schema persistence work is needed — the ACCEPT_ANY_SCHEMA change alone enables positional INSERT.

      Open design question: MongoTableProvider.getTable receives a schema both when it is declared (metastore) and when Spark has just inferred it, with no way to distinguish the two. Options: a connector configuration option to opt in to strict schema resolution, or accepting the capability change for all provider-path tables.

      Caveats to document: the schema lives in the Spark metastore, not with the collection (other clusters/tools will not see it), and a persistent metastore (Hive/Glue) is required — the default in-memory catalog loses table definitions when the session ends.

      Phase 2 — MongoCatalog tables (possible follow-up ticket)

      Spark only stores schemas for session-catalog tables; for custom catalogs it delegates all metadata to the catalog. MongoCatalog.createTable currently discards the schema, and loadTable re-infers it from sampled documents — so the declared columns are lost before the next statement runs. To support CREATE TABLE mongo.db.people (...), the connector must persist the schema itself.

      Proposal: create the collection with a $jsonSchema validator derived from the declared schema, using validationLevel: 'off' so it acts as pure metadata with no write enforcement. loadTable reads it back via listCollections and prefers it over inference. (Side benefit: the schema travels with the collection and is visible to other MongoDB tooling.)

      Trade-off to document (both phases): with a declared schema, DataFrame appends to that table also become schema-checked. This is arguably the point of declaring a schema, but it is a behaviour change.

      4. Catalog gap audit

      • Produce a support matrix for all TableCatalog / SupportsNamespaces operations and Spark SQL statements, with expected behaviour and error messages for unsupported paths.
      • Evaluate implementing SupportsRowLevelOperations to enable UPDATE / MERGE INTO (likely a follow-up ticket; document the decision here).
      • Ensure every unsupported operation throws a clear, actionable error that points to the documented alternative (e.g. DataFrame API).

      Acceptance criteria

      • DELETE either applies the complete WHERE clause or fails — it must never execute a partial filter.
      • CREATE TABLE ... USING mongodb succeeds from Spark SQL and creates the collection.
      • After a CREATE TABLE with a declared schema (session catalog in phase 1; MongoCatalog in phase 2), INSERT INTO t VALUES (...) without a column list writes documents with the declared field names, and a column-count mismatch fails at analysis time.
      • A documented support matrix exists for Spark SQL operations (ATTACHED).
      • Integration tests cover each supported statement and assert clear errors for each unsupported one.

      Related

      • SPARK-309 — support create collection options in MongoCatalog.createTable

            Assignee:
            Ajay Tandon
            Reporter:
            Ross Lawley
            None
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated: