Details
-
Task
-
Resolution: Unresolved
-
Major - P3
-
None
-
None
-
None
Description
It would be great to have a Transaction example in the documentation. On the first look, it does not seem trivial to me. My current approach (untested as of right now) is the following:
Â
public class ClientSessionSubscriber extends OperationSubscriber<ClientSession> { |
private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionSubscriber.class); |
List<TransactionContent> transactionContent;
|
IMongoDBDao mongoDBDao; public ClientSessionSubscriber() { } public ClientSessionSubscriber(List<TransactionContent> transactionContent, |
IMongoDBDao mongoDBDao) {
|
this.transactionContent = transactionContent; |
this.mongoDBDao = mongoDBDao; |
} @Override |
public void onNext(ClientSession clientSession) { |
clientSession.startTransaction(); for (TransactionContent content : transactionContent) { |
MongoCollection<Document> collection = content.getCollection();
|
try { |
if (content.type == InsertionType.INSERT) { |
Document document = content.getDocument();
|
mongoDBDao.insertOneBlocking(collection, document, 10, TimeUnit.SECONDS); |
} else if (content.type == InsertionType.UPDATE) { |
String id = content.getId();
|
Bson bson = content.getBson();
|
mongoDBDao.updateOneBlocking(collection, id, bson, 10, TimeUnit.SECONDS, true); |
}
|
} catch (Throwable e) { |
LOGGER.error("MongoDB insert/update transaction failed. Aborting transaction."); |
LOGGER.error(e.getMessage(), e);
|
clientSession.abortTransaction();
|
}
|
}
|
|
clientSession.commitTransaction();
|
LOGGER.info("Successfully inserted {} documents with transaction", transactionContent.size()); |
super.onNext(clientSession); |
} @Override |
public void onComplete() { |
if (super.getError() != null) { |
LOGGER.error("Insertion error occured!"); |
LOGGER.error(getError().getMessage(), getError());
|
return; |
}
|
super.onComplete(); |
}}
|
|
With a "helper" class to store information about each transaction (support insert and update at the same time)
public class TransactionContent { |
public enum InsertionType { |
INSERT, UPDATE
|
}; InsertionType type;
|
MongoCollection<Document> collection;
|
String id;
|
Document document;
|
Bson bson; public TransactionContent(MongoCollection<Document> collection, Document document) { |
this.type = InsertionType.INSERT; |
this.collection = collection; |
this.document = document; |
} public TransactionContent(MongoCollection<Document> collection, String id, Bson bson) { |
this.type = InsertionType.UPDATE; |
this.collection = collection; |
this.id = id; |
this.bson = bson; |
} public MongoCollection<Document> getCollection() { |
return collection; |
} public String getId() { |
return id; |
} public InsertionType getType() { |
return type; |
} public Document getDocument() { |
return document; |
} public Bson getBson() { |
return bson; |
}}
|
My DAO will utilize the ClientSessionSubscriber like this:
@Override |
public ClientSessionSubscriber startTransaction(MongoClient mongoClient, |
List<TransactionContent> transactionContent, long timeout, TimeUnit timeUnit) |
throws Throwable { ClientSessionSubscriber subscriber = |
new ClientSessionSubscriber(transactionContent, (IMongoDBDao) this); |
Publisher<ClientSession> publisher = mongoClient.startSession();
|
publisher.subscribe(subscriber);
|
if (timeout == -1L) { |
subscriber.await();
|
} else { |
subscriber.await(timeout, timeUnit);
|
}
|
return subscriber; |
}
|
|
Is this a valid approach? Some examples in the docs would be great! ![]()