Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-3542

Add Transaction Example in the docs

    • Type: Icon: Task Task
    • Resolution: Unresolved
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: Reactive Streams
    • Labels:
      None

      It would be great to have a Transaction example in the documentation. On the first look, it does not seem trivial to me. My current approach (untested as of right now) is the following:

       

      public class ClientSessionSubscriber extends OperationSubscriber<ClientSession> {
        private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionSubscriber.class);
        List<TransactionContent> transactionContent;
        IMongoDBDao mongoDBDao;  public ClientSessionSubscriber() {  }  public ClientSessionSubscriber(List<TransactionContent> transactionContent,
            IMongoDBDao mongoDBDao) {
          this.transactionContent = transactionContent;
          this.mongoDBDao = mongoDBDao;
        }  @Override
        public void onNext(ClientSession clientSession) {
          clientSession.startTransaction();    for (TransactionContent content : transactionContent) {
            MongoCollection<Document> collection = content.getCollection();
            try {
              if (content.type == InsertionType.INSERT) {
                Document document = content.getDocument();
                mongoDBDao.insertOneBlocking(collection, document, 10, TimeUnit.SECONDS);
              } else if (content.type == InsertionType.UPDATE) {
                String id = content.getId();
                Bson bson = content.getBson();
                mongoDBDao.updateOneBlocking(collection, id, bson, 10, TimeUnit.SECONDS, true);
              }
            } catch (Throwable e) {
              LOGGER.error("MongoDB insert/update transaction failed. Aborting transaction.");
              LOGGER.error(e.getMessage(), e);
              clientSession.abortTransaction();
            }
          }
          
          clientSession.commitTransaction();
          LOGGER.info("Successfully inserted {} documents with transaction", transactionContent.size());
          super.onNext(clientSession);
        }  @Override
        public void onComplete() {
          if (super.getError() != null) {
            LOGGER.error("Insertion error occured!");
            LOGGER.error(getError().getMessage(), getError());
            return;
          }
          super.onComplete();
        }}
      
      

      With a "helper" class to store information about each transaction (support insert and update at the same time)

      public class TransactionContent {
        public enum InsertionType {
          INSERT, UPDATE
        };  InsertionType type;
        MongoCollection<Document> collection;
        String id;
        Document document;
        Bson bson;  public TransactionContent(MongoCollection<Document> collection, Document document) {
          this.type = InsertionType.INSERT;
          this.collection = collection;
          this.document = document;
        }  public TransactionContent(MongoCollection<Document> collection, String id, Bson bson) {
          this.type = InsertionType.UPDATE;
          this.collection = collection;
          this.id = id;
          this.bson = bson;
        }  public MongoCollection<Document> getCollection() {
          return collection;
        }  public String getId() {
          return id;
        }  public InsertionType getType() {
          return type;
        }  public Document getDocument() {
          return document;
        }  public Bson getBson() {
          return bson;
        }}
      

      My DAO will utilize the ClientSessionSubscriber like this:

        @Override
        public ClientSessionSubscriber startTransaction(MongoClient mongoClient,
            List<TransactionContent> transactionContent, long timeout, TimeUnit timeUnit)
            throws Throwable {    ClientSessionSubscriber subscriber =
              new ClientSessionSubscriber(transactionContent, (IMongoDBDao) this);
          Publisher<ClientSession> publisher = mongoClient.startSession();
          publisher.subscribe(subscriber);
          if (timeout == -1L) {
            subscriber.await();
          } else {
            subscriber.await(timeout, timeUnit);
          }
          return subscriber;
        }
      
      

      Is this a valid approach? Some examples in the docs would be great!

            Assignee:
            Unassigned Unassigned
            Reporter:
            jody Jody
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated: