[JAVA-4725] BulkWriteResult#getUpserts won't return updated documents. Only inserted Created: 10/Sep/22  Updated: 30/Sep/22  Resolved: 30/Sep/22

Status: Closed
Project: Java Driver
Component/s: Write Operations
Affects Version/s: None
Fix Version/s: None

Type: Question Priority: Unknown
Reporter: Almog Tavor Assignee: Jeffrey Yemin
Resolution: Done Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Attachments: PNG File image-2022-09-10-19-31-11-675.png    

 Description   

Summary

_I'm writing software for upserting data into MongoDb with Project Reactor and Spring. I currently succeed doing this, but I want to ensure that every message in the bulk I'm writing, got written successfully. If a message failed but others succeeded, I want to know which message was this. While this works for new messages thanks to the `BulkWriteResult` object, this doesn't work for messages that are updating new messages. When one of the messages in my bulk updates an existing document, the `bulkWriteResult.getUpserts()` won't include the message, and neither any other parameter of the BulkWriteResult. The test demonstrates this.

While debugging this, I saw that both of the messages appeared at the `MixedBulkWriteOperation`:

The Mongo version is 5.0. The drivers are 4.7.1.

How to Reproduce

I've created a test class for demonstration:

package ir.integration.mongo;
 
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import lombok.Builder;
import lombok.Data;
import org.bson.Document;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.support.TestPropertySourceUtils;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
 
import java.util.List;
 
import static java.lang.String.format;
 
@ExtendWith(SpringExtension.class)
@DataMongoTest
@ContextConfiguration(initializers = BulkWriteMongoInteractionTests.Initializer.class)
@Testcontainers(disabledWithoutDocker = true)
class BulkWriteMongoInteractionTests {
    public static final String DATABASE_NAME = "myDb";
    @Container
    public static MongoDBContainer MONGO_CONTAINER = new MongoDBContainer("mongo:5.0")
            .withExposedPorts(27017);
    @Autowired
    private ReactiveMongoTemplate mongoTemplate;
 
    @BeforeAll
    static void startContainer() {
        MONGO_CONTAINER.start();
    }
     // WORKING      @Test
    void givenTwoDocuments_whenPerformingBulkUpsertOfNewDocuments_expectFitBulkWriteResult() {
        Person person1 = Person.builder()
                .id("Person First")
                .address("0905567888")
                .name("eu8afeja8fjeiajkfea")
                .build();
        Person person2 = Person.builder()
                .id("Person Second")
                .address("83928492898")
                .name("jajfieajieaef8")
                .build();
 
        var mono = mongoTemplate.getCollection("myCollection")
                .flatMap(mongoCollection -> {
                    UpdateOneModel<Document> updateOneModel1 = getDocumentUpdateOneModel(person1);
                    UpdateOneModel<Document> updateOneModel2 = getDocumentUpdateOneModel(person2);
                    var operations = List.of(updateOneModel1, updateOneModel2);
                    return Mono.from(mongoCollection.bulkWrite(operations));
                });
 
        StepVerifier
                .create(mono)
                .expectNextMatches(bulkWriteResult ->
                        bulkWriteResult.getUpserts().get(0).getId().asString().getValue().equals("Person First") &&
                        bulkWriteResult.getUpserts().get(1).getId().asString().getValue().equals("Person Second"))
                .verifyComplete();
    }
 
    // NOT WORKING
    @Test
    void givenTwoDocuments_whenPerformingBulkUpsertOfExistingDocument_expectFitBulkWriteResult() {
        Person person1 = Person.builder()
                .id("Person First")
                .address("0905567888")
                .name("eu8afeja8fjeiajkfea")
                .build();
        Person person1update = Person.builder()
                .id("Person First")
                .address("22222222222222222")
                .name("22222222222222222")
                .build();
        Person person2 = Person.builder()
                .id("Person Second")
                .address("83928492898")
                .name("jajfieajieaef8")
                .build();
 
        var mono = mongoTemplate.getCollection("myCollection")
                .flatMap(mongoCollection -> {
                    UpdateOneModel<Document> updateOneModel1 = getDocumentUpdateOneModel(person1);
                    var operations = List.of(updateOneModel1);
                    return Mono.from(mongoCollection.bulkWrite(operations));
                })
                .then(mongoTemplate.getCollection("myCollection"))
                .flatMap(mongoCollection -> {
                    UpdateOneModel<Document> updateOneModel1update = getDocumentUpdateOneModel(person1update);
                    UpdateOneModel<Document> updateOneModel2 = getDocumentUpdateOneModel(person2);
                    var operations = List.of(updateOneModel1update, updateOneModel2);
                    return Mono.from(mongoCollection.bulkWrite(operations, new BulkWriteOptions().ordered(false)));
                });
 
        StepVerifier
                .create(mono)
                .expectNextMatches(bulkWriteResult ->
                        bulkWriteResult.getUpserts().get(0).getId().asString().getValue().equals("Person First") &&
                        bulkWriteResult.getUpserts().get(1).getId().asString().getValue().equals("Person Second"))
                .verifyComplete();
    }
 
    @NotNull
    private UpdateOneModel<Document> getDocumentUpdateOneModel(Person person1) {
        Document doc = new Document();
        mongoTemplate.getConverter().write(person1, doc);
        var filter = new Document("_id", person1.getId());
        return new UpdateOneModel<>(filter, new Document("$set", doc), new UpdateOptions().upsert(true));
    }
 
    static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
        @Override
        public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) {
            TestPropertySourceUtils.addInlinedPropertiesToEnvironment(configurableApplicationContext,
                    format("spring.data.mongodb.uri=%s",
                            MONGO_CONTAINER.getReplicaSetUrl(DATABASE_NAME)));
        }
    }
 
    @Builder
    @Data
    static class Person {
        @Id
        private String id;
        private String address;
        private String name;
    }
}

Additional Background

 



 Comments   
Comment by Jeffrey Yemin [ 30/Sep/22 ]

I think your understanding is correct.

Comment by Almog Tavor [ 30/Sep/22 ]

I see. But I wonder about the default behavior. When I write a bulk that fails for some of the records, I will definitely get an exception, and I will be able to see which of the records got errors. I that's indeed the case (looks like it is from your code), then I think this behavior is fair enough. Was I correct about my understanding?

Comment by Jeffrey Yemin [ 28/Sep/22 ]

It's not transactional by default, but you can wrap it in a transaction to make it so. You can start at https://www.mongodb.com/docs/manual/core/transactions/ to learn more about that.

Comment by Almog Tavor [ 28/Sep/22 ]

Hey. Thanks for the detailed example. I did learn rn about getWriteErrors and getWriteResult().getUpserts().
I have just one question. Is it possible that a bulk write of 20 documents will succeed at the write operation on 10, but 10 will fail?
Mainly for these use cases, I'd like to get the feature that I opened this issue about.
If that's not possible and the bulk write is somewhat transactional by default then I'll be fine using the handling you've specified

Comment by Jeffrey Yemin [ 28/Sep/22 ]

I thought a little bit more about your original question, and before proceeding with a server enhancement request, I want to make sure there is a more complete understanding of the capabilities that the server and driver already provide. Have a look at the test program, which I've annotated with comments explaining what's happening at every stage, and let me know if that changes your understanding at all.

import com.mongodb.MongoBulkWriteException;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import org.bson.Document;
 
import java.util.Arrays;
 
public class JAVA4725 {
    public static void main(String[] args) {
        var client = MongoClients.create();
        var coll = client.getDatabase("test").getCollection("JAVA4725");
 
        // clean slate
        coll.drop();
        // Insert a document, making the first updateOne an update rather than an insert
        coll.insertOne(new Document("_id", 1).append("x", 0));
 
        try {
            var result = coll.bulkWrite(Arrays.asList(
                    // this will result in an update
                    new UpdateOneModel<>(Filters.eq("_id", 1), Updates.inc("x", 1),
                            new UpdateOptions().upsert(true)),
                    // this will result in an insert
                    new UpdateOneModel<>(Filters.eq("_id", 2), Updates.inc("x", 1),
                            new UpdateOptions().upsert(true))));
            // If we get to this point (i.e. no exception thrown) it means that all the writes succeeded
            // So any of the updates that do _not_ appear in the upserts list resulted in an update 
            // This will print: "[BulkWriteUpsert{index=1, id=BsonInt32{value=2}}]"
            // which means that the upsert at index 0 resulted in an update
            System.out.println(result.getUpserts());
        } catch (MongoBulkWriteException e) {
            // And if something went wrong, you can still get the upserts list from the exception
            System.out.println(e.getWriteResult().getUpserts());
            // And you can also get the list of write errors. 
            System.out.println(e.getWriteErrors());
            // So any of the updates (identified by their index in the bulk write array) that appear in neither list
            // resulted in an update
        }
    }
}

Comment by Almog Tavor [ 28/Sep/22 ]

Ok. I’ll open a correspondent issue for the SERVER. I think a flag may solve the use case of a super large update wouldn’t it? And anyway, for super large insert, shouldn’t this problem occur too? How come the server does return this information although there are risks? But in general I think that a flag of “getOperationInformation” would solve this (and maybe will enable sending even more data like the shard key and not just the _id), since on that case the risks will be at the responsibility of the user. What do you think?

Comment by Jeffrey Yemin [ 27/Sep/22 ]

almogtavor@gmail.com the short answer is that it's not included in BulkWriteResult because it's not included in the reply from the server.  There is nothing the driver can do here without a corresponding change to the server.  We could consider adding it to the server reply and then exposing it in the driver API, but we'd have to be careful in how we do it.  Consider, for example, an updateMany operation whose filter matches every document in a billion-document sharded collection.  A reply containing the _id of every modified document is probably not what anyone wants, and it would also blow out our 16MB reply limit.

Comment by Almog Tavor [ 16/Sep/22 ]

The issue you've pointed out is relevant here, but I don't understand the rationale behind it. Why doesn't BulkWriteResult include both updated ids as well as inserted ids? And if so, where can I find the updated ids?

Comment by Almog Tavor [ 16/Sep/22 ]

You've said that BulkWriteResult.getUpserts will return the IDs of documents inserted as a result of an upsert. But that's true only for cases where the document's ID didn't exist in the collection before the upsert operation. e.g. for cases where the upsert causes an update operation (since the document's ID already exists in the collection), the id won't appear on the BulkWriteResult.getUpserts function.

That is exactly what the test that I've added shows. The getModifiedCount and getMatchedCount functions aren't useful to me, since I need the exact documents that have been upserted.

Comment by Ashni Mehta [ 13/Sep/22 ]

Hey Almog, thanks for reaching out. BulkWriteResult.getUpserts will return the IDs of documents inserted as a result of an upsert.

It seems like getModifiedCount and getMatchedCount could be useful for you. You can read more about those here: https://mongodb.github.io/mongo-java-driver/4.7/apidocs/mongodb-driver-core/com/mongodb/client/result/UpdateResult.html#getModifiedCount(). 

Additionally, we have a ticket related to this that'll involve updating this method's javadoc to clarify. Feel free to vote on it!

https://jira.mongodb.org/browse/JAVA-2491

Comment by Almog Tavor [ 13/Sep/22 ]

Have you found anything?

Generated at Thu Feb 08 09:02:49 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.