Details
-
Question
-
Resolution: Done
-
Unknown
-
None
-
None
-
None
Description
Summary
_I'm writing software for upserting data into MongoDb with Project Reactor and Spring. I currently succeed doing this, but I want to ensure that every message in the bulk I'm writing, got written successfully. If a message failed but others succeeded, I want to know which message was this. While this works for new messages thanks to the `BulkWriteResult` object, this doesn't work for messages that are updating new messages. When one of the messages in my bulk updates an existing document, the `bulkWriteResult.getUpserts()` won't include the message, and neither any other parameter of the BulkWriteResult. The test demonstrates this.
While debugging this, I saw that both of the messages appeared at the `MixedBulkWriteOperation`:

The Mongo version is 5.0. The drivers are 4.7.1.
How to Reproduce
I've created a test class for demonstration:
package ir.integration.mongo; |
|
|
import com.mongodb.client.model.BulkWriteOptions; |
import com.mongodb.client.model.UpdateOneModel; |
import com.mongodb.client.model.UpdateOptions; |
import lombok.Builder; |
import lombok.Data; |
import org.bson.Document; |
import org.jetbrains.annotations.NotNull; |
import org.junit.jupiter.api.BeforeAll; |
import org.junit.jupiter.api.Disabled; |
import org.junit.jupiter.api.Test; |
import org.junit.jupiter.api.extension.ExtendWith; |
import org.springframework.beans.factory.annotation.Autowired; |
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; |
import org.springframework.context.ApplicationContextInitializer; |
import org.springframework.context.ConfigurableApplicationContext; |
import org.springframework.data.annotation.Id; |
import org.springframework.data.mongodb.core.ReactiveMongoTemplate; |
import org.springframework.test.context.ContextConfiguration; |
import org.springframework.test.context.junit.jupiter.SpringExtension; |
import org.springframework.test.context.support.TestPropertySourceUtils; |
import org.testcontainers.containers.MongoDBContainer; |
import org.testcontainers.junit.jupiter.Container; |
import org.testcontainers.junit.jupiter.Testcontainers; |
import reactor.core.publisher.Mono; |
import reactor.test.StepVerifier; |
|
|
import java.util.List; |
|
|
import static java.lang.String.format; |
|
|
@ExtendWith(SpringExtension.class) |
@DataMongoTest
|
@ContextConfiguration(initializers = BulkWriteMongoInteractionTests.Initializer.class) |
@Testcontainers(disabledWithoutDocker = true) |
class BulkWriteMongoInteractionTests { |
public static final String DATABASE_NAME = "myDb"; |
@Container |
public static MongoDBContainer MONGO_CONTAINER = new MongoDBContainer("mongo:5.0") |
.withExposedPorts(27017); |
@Autowired |
private ReactiveMongoTemplate mongoTemplate; |
|
|
@BeforeAll |
static void startContainer() { |
MONGO_CONTAINER.start();
|
}
|
// WORKING @Test |
void givenTwoDocuments_whenPerformingBulkUpsertOfNewDocuments_expectFitBulkWriteResult() { |
Person person1 = Person.builder()
|
.id("Person First") |
.address("0905567888") |
.name("eu8afeja8fjeiajkfea") |
.build();
|
Person person2 = Person.builder()
|
.id("Person Second") |
.address("83928492898") |
.name("jajfieajieaef8") |
.build();
|
|
|
var mono = mongoTemplate.getCollection("myCollection") |
.flatMap(mongoCollection -> {
|
UpdateOneModel<Document> updateOneModel1 = getDocumentUpdateOneModel(person1);
|
UpdateOneModel<Document> updateOneModel2 = getDocumentUpdateOneModel(person2);
|
var operations = List.of(updateOneModel1, updateOneModel2);
|
return Mono.from(mongoCollection.bulkWrite(operations)); |
});
|
|
|
StepVerifier
|
.create(mono)
|
.expectNextMatches(bulkWriteResult ->
|
bulkWriteResult.getUpserts().get(0).getId().asString().getValue().equals("Person First") && |
bulkWriteResult.getUpserts().get(1).getId().asString().getValue().equals("Person Second")) |
.verifyComplete();
|
}
|
|
|
// NOT WORKING |
@Test |
void givenTwoDocuments_whenPerformingBulkUpsertOfExistingDocument_expectFitBulkWriteResult() { |
Person person1 = Person.builder()
|
.id("Person First") |
.address("0905567888") |
.name("eu8afeja8fjeiajkfea") |
.build();
|
Person person1update = Person.builder()
|
.id("Person First") |
.address("22222222222222222") |
.name("22222222222222222") |
.build();
|
Person person2 = Person.builder()
|
.id("Person Second") |
.address("83928492898") |
.name("jajfieajieaef8") |
.build();
|
|
|
var mono = mongoTemplate.getCollection("myCollection") |
.flatMap(mongoCollection -> {
|
UpdateOneModel<Document> updateOneModel1 = getDocumentUpdateOneModel(person1);
|
var operations = List.of(updateOneModel1);
|
return Mono.from(mongoCollection.bulkWrite(operations)); |
})
|
.then(mongoTemplate.getCollection("myCollection")) |
.flatMap(mongoCollection -> {
|
UpdateOneModel<Document> updateOneModel1update = getDocumentUpdateOneModel(person1update);
|
UpdateOneModel<Document> updateOneModel2 = getDocumentUpdateOneModel(person2);
|
var operations = List.of(updateOneModel1update, updateOneModel2);
|
return Mono.from(mongoCollection.bulkWrite(operations, new BulkWriteOptions().ordered(false))); |
});
|
|
|
StepVerifier
|
.create(mono)
|
.expectNextMatches(bulkWriteResult ->
|
bulkWriteResult.getUpserts().get(0).getId().asString().getValue().equals("Person First") && |
bulkWriteResult.getUpserts().get(1).getId().asString().getValue().equals("Person Second")) |
.verifyComplete();
|
}
|
|
|
@NotNull |
private UpdateOneModel<Document> getDocumentUpdateOneModel(Person person1) { |
Document doc = new Document(); |
mongoTemplate.getConverter().write(person1, doc);
|
var filter = new Document("_id", person1.getId()); |
return new UpdateOneModel<>(filter, new Document("$set", doc), new UpdateOptions().upsert(true)); |
}
|
|
|
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> { |
@Override |
public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) { |
TestPropertySourceUtils.addInlinedPropertiesToEnvironment(configurableApplicationContext,
|
format("spring.data.mongodb.uri=%s", |
MONGO_CONTAINER.getReplicaSetUrl(DATABASE_NAME)));
|
}
|
}
|
|
|
@Builder |
@Data |
static class Person { |
@Id |
private String id; |
private String address; |
private String name; |
}
|
}
|
Additional Background