Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-259

Spark Connector Custom Codecs not working

    • Type: Icon: Bug Bug
    • Resolution: Won't Fix
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 2.3.2
    • Component/s: API
    • Labels:
      None

      Previous Versions in use:-
      MongoDB version 3.6.0
      Spark version 2.3.0
      Mongo Spark Connector version 2.2.1
      Scala Version 2.11

      We are using Custom java classes in our query and we had register the custom codecs for these classes by providing the custom MongoClientFactory to the Mongo Connector as follows,

       

      MongoConnector connector = new MongoConnector(new CustomMongoClientFactory(ReadConfig.stripPrefix(readConfig.asOptions()).get(MongoSharedConfig.mongoURIProperty()).get()));
      return MongoSpark.builder().connector(connector).javaSparkContext(jsc).readConfig(readConfig).build();
      /**
      * Custom mongoClientFactory which uses provided codecRegistry to encode/decode Java Objects into BSON
      *
      */
      private static class CustomMongoClientFactory implements MongoClientFactory {
      
      
          private static final long serialVersionUID = -9033892144353851122L;
          private String mongoURI;
      
      
          public CustomMongoClientFactory(String mongoURI) {
              this.mongoURI = mongoURI;
          }
      
      
          @Override
          public MongoClient create() {
              MongoClientOptions.Builder builder = MongoClientOptions.builder()
                  .codecRegistry(CodecRegistryHelper.getCodecRegistry());
              return new MongoClient(new MongoClientURI(mongoURI, builder));
          }
      }
      

      We are using JavaMongoRDD.withPipeline() method to query the mongodb using the spark connector. So this was using the custom codecs provided.
      Latest versions:-
      MongoDB version 4.2.0
      Spark version 2.3.0
      Mongo Spark Connector version 2.3.2
      Scala Version 2.11

      org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class com.smartstreamrdu.domain.RduDataValueUnit.
      at org.bson.codecs.configuration.CodecCache.getOrThrow(CodecCache.java:46)
      at org.bson.codecs.configuration.ProvidersCodecRegistry.get(ProvidersCodecRegistry.java:63)
      at org.bson.codecs.configuration.ChildCodecRegistry.get(ChildCodecRegistry.java:51)
      at org.bson.codecs.DocumentCodec.writeValue(DocumentCodec.java:184)
      at org.bson.codecs.DocumentCodec.writeMap(DocumentCodec.java:199)
      at org.bson.codecs.DocumentCodec.writeValue(DocumentCodec.java:182)
      at org.bson.codecs.DocumentCodec.writeIterable(DocumentCodec.java:207)
      at org.bson.codecs.DocumentCodec.writeValue(DocumentCodec.java:180)
      at org.bson.codecs.DocumentCodec.writeMap(DocumentCodec.java:199)
      at org.bson.codecs.DocumentCodec.writeValue(DocumentCodec.java:182)
      at org.bson.codecs.DocumentCodec.writeMap(DocumentCodec.java:199)
      at org.bson.codecs.DocumentCodec.encode(DocumentCodec.java:141)
      at org.bson.codecs.DocumentCodec.encode(DocumentCodec.java:45)
      at org.bson.BsonDocumentWrapper.getUnwrapped(BsonDocumentWrapper.java:195)
      at org.bson.BsonDocumentWrapper.entrySet(BsonDocumentWrapper.java:165)
      at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:109)
      at org.bson.BsonDocument.toJson(BsonDocument.java:835)
      at org.bson.BsonDocument.toJson(BsonDocument.java:825)
      at com.mongodb.spark.config.ReadConfig$$anonfun$14.apply(ReadConfig.scala:455)
      at com.mongodb.spark.config.ReadConfig$$anonfun$14.apply(ReadConfig.scala:455)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      at com.mongodb.spark.config.ReadConfig.withPipeline(ReadConfig.scala:455)
      at com.mongodb.spark.rdd.MongoRDD.withPipeline(MongoRDD.scala:115)
      at com.mongodb.spark.rdd.api.java.JavaMongoRDD.withPipeline(JavaMongoRDD.scala:46)
      

      So is there any way to register the custom codecs using the Mongo-spark Connector version 2.3.2?

            Assignee:
            Unassigned Unassigned
            Reporter:
            varun.ramani@smartstreamrdu.com Varun Ramani
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: