Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-3938

Change stream connections not returned

    • Type: Icon: Bug Bug
    • Resolution: Duplicate
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 4.1.0
    • Component/s: Async, Change Streams
    • Labels:
      None

      Where the max connections is 100 the following code will fail to complete:

          public static void main(final String[] args) {
              MongoClient mongoClient = MongoClients.create("mongodb://localhost");
              MongoDatabase db = mongoClient.getDatabase("import");
              MongoCollection<Document> gameEntityCollection = db.getCollection("gameEnitites");
              for (int i = 0; i < 200; i++) {
                  final int subscribeNumber = i;
                  System.out.println("Subscribe number " + subscribeNumber);
                  gameEntityCollection.watch().subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
                      @Override
                      public void onSubscribe(Subscription s) {
                          s.request(1);
                          new Thread(() -> {
                              try {
                                  Thread.sleep(1000);
                                  s.cancel();
                                  System.out.println("Cancelled Subscribe number " + subscribeNumber);
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }).start();
                      }
      
                      @Override
                      public void onNext(ChangeStreamDocument<Document> t) {
                      }
      
                      @Override
                      public void onError(Throwable t) {
                          t.printStackTrace();
                      }
      
                      @Override
                      public void onComplete() {
                          System.out.println("Completed Subscribe number " + subscribeNumber);
                      }
                  });
                  CountDownLatch cd = new CountDownLatch(1);
                  gameEntityCollection.countDocuments().subscribe(new Subscriber<Long>() {
                      @Override
                      public void onSubscribe(Subscription s) {
                          s.request(Long.MAX_VALUE);
                      }
      
                      @Override
                      public void onNext(Long aLong) {
                          System.out.println("Subscribe number " + subscribeNumber + " collection size: " + aLong);
                      }
      
                      @Override
                      public void onError(Throwable t) {
                      }
      
                      @Override
                      public void onComplete() {
                          cd.countDown();
                      }
                  });
                  cd.await();
              }
          }
      

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            ross@mongodb.com Ross Lawley
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: