-
Type: Task
-
Resolution: Unresolved
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: Reactive Streams
-
Labels:None
It would be great to have a Transaction example in the documentation. On the first look, it does not seem trivial to me. My current approach (untested as of right now) is the following:
public class ClientSessionSubscriber extends OperationSubscriber<ClientSession> { private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionSubscriber.class); List<TransactionContent> transactionContent; IMongoDBDao mongoDBDao; public ClientSessionSubscriber() { } public ClientSessionSubscriber(List<TransactionContent> transactionContent, IMongoDBDao mongoDBDao) { this.transactionContent = transactionContent; this.mongoDBDao = mongoDBDao; } @Override public void onNext(ClientSession clientSession) { clientSession.startTransaction(); for (TransactionContent content : transactionContent) { MongoCollection<Document> collection = content.getCollection(); try { if (content.type == InsertionType.INSERT) { Document document = content.getDocument(); mongoDBDao.insertOneBlocking(collection, document, 10, TimeUnit.SECONDS); } else if (content.type == InsertionType.UPDATE) { String id = content.getId(); Bson bson = content.getBson(); mongoDBDao.updateOneBlocking(collection, id, bson, 10, TimeUnit.SECONDS, true); } } catch (Throwable e) { LOGGER.error("MongoDB insert/update transaction failed. Aborting transaction."); LOGGER.error(e.getMessage(), e); clientSession.abortTransaction(); } } clientSession.commitTransaction(); LOGGER.info("Successfully inserted {} documents with transaction", transactionContent.size()); super.onNext(clientSession); } @Override public void onComplete() { if (super.getError() != null) { LOGGER.error("Insertion error occured!"); LOGGER.error(getError().getMessage(), getError()); return; } super.onComplete(); }}
With a "helper" class to store information about each transaction (support insert and update at the same time)
public class TransactionContent { public enum InsertionType { INSERT, UPDATE }; InsertionType type; MongoCollection<Document> collection; String id; Document document; Bson bson; public TransactionContent(MongoCollection<Document> collection, Document document) { this.type = InsertionType.INSERT; this.collection = collection; this.document = document; } public TransactionContent(MongoCollection<Document> collection, String id, Bson bson) { this.type = InsertionType.UPDATE; this.collection = collection; this.id = id; this.bson = bson; } public MongoCollection<Document> getCollection() { return collection; } public String getId() { return id; } public InsertionType getType() { return type; } public Document getDocument() { return document; } public Bson getBson() { return bson; }}
My DAO will utilize the ClientSessionSubscriber like this:
@Override public ClientSessionSubscriber startTransaction(MongoClient mongoClient, List<TransactionContent> transactionContent, long timeout, TimeUnit timeUnit) throws Throwable { ClientSessionSubscriber subscriber = new ClientSessionSubscriber(transactionContent, (IMongoDBDao) this); Publisher<ClientSession> publisher = mongoClient.startSession(); publisher.subscribe(subscriber); if (timeout == -1L) { subscriber.await(); } else { subscriber.await(timeout, timeUnit); } return subscriber; }
Is this a valid approach? Some examples in the docs would be great!