[JAVA-3542] Add Transaction Example in the docs Created: 20/Aug/18  Updated: 30/Mar/22

Status: Backlog
Project: Java Driver
Component/s: Reactive Streams
Affects Version/s: None
Fix Version/s: None

Type: Task Priority: Major - P3
Reporter: Jody Assignee: Unassigned
Resolution: Unresolved Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Duplicate

 Description   

It would be great to have a Transaction example in the documentation. On the first look, it does not seem trivial to me. My current approach (untested as of right now) is the following:

 

public class ClientSessionSubscriber extends OperationSubscriber<ClientSession> {
  private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionSubscriber.class);
  List<TransactionContent> transactionContent;
  IMongoDBDao mongoDBDao;  public ClientSessionSubscriber() {  }  public ClientSessionSubscriber(List<TransactionContent> transactionContent,
      IMongoDBDao mongoDBDao) {
    this.transactionContent = transactionContent;
    this.mongoDBDao = mongoDBDao;
  }  @Override
  public void onNext(ClientSession clientSession) {
    clientSession.startTransaction();    for (TransactionContent content : transactionContent) {
      MongoCollection<Document> collection = content.getCollection();
      try {
        if (content.type == InsertionType.INSERT) {
          Document document = content.getDocument();
          mongoDBDao.insertOneBlocking(collection, document, 10, TimeUnit.SECONDS);
        } else if (content.type == InsertionType.UPDATE) {
          String id = content.getId();
          Bson bson = content.getBson();
          mongoDBDao.updateOneBlocking(collection, id, bson, 10, TimeUnit.SECONDS, true);
        }
      } catch (Throwable e) {
        LOGGER.error("MongoDB insert/update transaction failed. Aborting transaction.");
        LOGGER.error(e.getMessage(), e);
        clientSession.abortTransaction();
      }
    }
    
    clientSession.commitTransaction();
    LOGGER.info("Successfully inserted {} documents with transaction", transactionContent.size());
    super.onNext(clientSession);
  }  @Override
  public void onComplete() {
    if (super.getError() != null) {
      LOGGER.error("Insertion error occured!");
      LOGGER.error(getError().getMessage(), getError());
      return;
    }
    super.onComplete();
  }}

With a "helper" class to store information about each transaction (support insert and update at the same time)

public class TransactionContent {
  public enum InsertionType {
    INSERT, UPDATE
  };  InsertionType type;
  MongoCollection<Document> collection;
  String id;
  Document document;
  Bson bson;  public TransactionContent(MongoCollection<Document> collection, Document document) {
    this.type = InsertionType.INSERT;
    this.collection = collection;
    this.document = document;
  }  public TransactionContent(MongoCollection<Document> collection, String id, Bson bson) {
    this.type = InsertionType.UPDATE;
    this.collection = collection;
    this.id = id;
    this.bson = bson;
  }  public MongoCollection<Document> getCollection() {
    return collection;
  }  public String getId() {
    return id;
  }  public InsertionType getType() {
    return type;
  }  public Document getDocument() {
    return document;
  }  public Bson getBson() {
    return bson;
  }}

My DAO will utilize the ClientSessionSubscriber like this:

  @Override
  public ClientSessionSubscriber startTransaction(MongoClient mongoClient,
      List<TransactionContent> transactionContent, long timeout, TimeUnit timeUnit)
      throws Throwable {    ClientSessionSubscriber subscriber =
        new ClientSessionSubscriber(transactionContent, (IMongoDBDao) this);
    Publisher<ClientSession> publisher = mongoClient.startSession();
    publisher.subscribe(subscriber);
    if (timeout == -1L) {
      subscriber.await();
    } else {
      subscriber.await(timeout, timeUnit);
    }
    return subscriber;
  }

Is this a valid approach? Some examples in the docs would be great!



 Comments   
Comment by Ross Lawley [ 17/Dec/18 ]

Moving to Open.

Using the session / transaction API with low level Publishers may be complex and nuanced. Using higher level libraries such as JavaRx2 or Reactor may provide a composable API that makes handling transactions much simpler.

Comment by Ross Lawley [ 20/Aug/18 ]

Hi jody,

JAVARS-77 was initially closed as a Observables aren't compsable in the reactive streams API, following the documentation example would be verbose and complex.  There are higher level API's such as RxJava and Reactor that make the Observable pattern composable and allow for much simpler code when using Client Sessions.  

For that reason it was decided that following the documentation example wouldn't make much sense to users.

 Ross

 

Comment by Jody [ 20/Aug/18 ]

For some reason the Jira code formatting does not like the Google Java Style :/

I'm sorry for the badly formatted code

Generated at Thu Feb 08 08:59:51 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.