[JAVA-4446] can't connect to server Created: 20/Jan/22  Updated: 04/May/22  Resolved: 01/Feb/22

Status: Closed
Project: Java Driver
Component/s: Reactive Streams
Affects Version/s: 4.4.0, 4.4.1
Fix Version/s: None

Type: Question Priority: Unknown
Reporter: 夕 一 Assignee: Ross Lawley
Resolution: Incomplete Votes: 0
Labels: external-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Attachments: Text File bussiness error.log     PNG File image-2022-01-20-10-33-35-310.png     PNG File image-2022-01-20-10-34-22-363.png     PNG File image-2022-01-20-10-36-44-226.png     PNG File image-2022-01-20-10-44-51-350.png     PNG File image-2022-01-20-10-47-14-797.png     PNG File image-2022-01-20-10-47-44-109.png     PNG File image-2022-01-20-10-48-35-473.png     PNG File image-2022-01-20-10-48-46-127.png     PNG File image-2022-01-20-10-50-10-031.png     PNG File image-2022-01-20-10-50-36-022.png     PNG File image-2022-01-20-10-50-43-995.png     PNG File image-2022-01-20-10-50-48-027.png     PNG File image-2022-01-20-10-51-48-988.png     Text File netty error report.log     PNG File screenshot-1.png     PNG File screenshot-2.png     PNG File screenshot-3.png    

 Description   

Summary

i'm using mongodb-driver-reactivestreams to connect to my mongodb server, the version is 

4.4.0, i have receive warn alarm some times, i found the alarm interface are connect to the mongodb. my project throw error 

and i found the netty info is here

 

the server version is:

the mongo server is cluster mode, the program connect to the load balance proxy.

 

my application is reform from a block system to a reactive system, from spring mvc to spring webflux, using kotlin and kotlin coroutines, when i work with the block driver, the issure dosn't appear, when i upgrade to reactive stream version, the error appear occasionally, i have read some code, the driver dosn't log any error message. 

Please provide the version of the driver. If applicable, please provide the MongoDB server version and topology (standalone, replica set, or sharded cluster).

i have wrote a mongo adapter to work with mongo server, the mongo adapter inject an mongoClient instance which product by spring

mmy code is here

 

 

 

 

my prog log is here:

Steps to reproduce. If possible, please include a Short, Self Contained, Correct (Compilable), Example.

Additional Background

Please provide any additional background information that may be helpful in diagnosing the bug.



 Comments   
Comment by Ross Lawley [ 01/Feb/22 ]

Please follow up on one of the support channels previously mentioned.

Comment by 夕 一 [ 24/Jan/22 ]

the business error and the netty info are 30 days.
netty error report.log
bussiness error.log

Comment by 夕 一 [ 24/Jan/22 ]

thanks very much,i have read the driver source, and debug into it. i found the netty report connect timeout much more then business code report error, the timeout info logger appear 7111 times and business code report error 7 times, which indicate the network may is ok when netty report network traffic, when the poll has connected connection, the bussiness code work well.
i would connect to the server direct to avoid connect one proxy, so my server list may increment to 3, and error may reduce a lot, is there any improvement work to do, i found the redis error is almost never found, i use the lettuce, the driver only make one connection.

Comment by Ross Lawley [ 21/Jan/22 ]

Hi lovewebmail@gmail.com,

As jeff.yemin mentions there is no evidence of a bug, this is expected behaviour of the driver when it cannot connect to a server.

The error message:

ServerDescription{address=10.209.12.203:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketWriteException: 
Exception sending message,  caused by {io.netty.channel.StacklessClosedChannelException}}]}. Waiting for 30000 ms before timing out

Shows that the application cannot connect to the MongoDB at that address. The reasons why it can't connect are likely either:

  1. Invalid host - a typo on the ip address you are trying to connect to or an invalid ip address from the point of the application.
  2. Inaccessible host - the host is not configured to allow network traffic from the application's ip address - eg some network firewall / ip tables rules
  3. IP Binding - the MongoDB is configured not to allow connections from the application ip address

I hope that helps,

As jeff.yemin mentioned please reach out to one of the support options:

  • Our MongoDB support portal, located at support.mongodb.com
  • Our MongoDB community portal, located here
  • If you are an Atlas customer, there is free support offered 24/7 in the lower right hand corner of the UI

Just in case you have already opened a support case and are not receiving sufficient help, please let me know and I can facilitate escalating your issue.

All the best,

Ross

Comment by 夕 一 [ 20/Jan/22 ]

it only throw an error, but the error may wrap many times, so the origin error may lost, the error is print by netty, which indicate that it can't connect to the server due to the endpoint is empty.
"cluster No server chosen by com.mongodb.reactivestreams.client.internal.ClientSessionHelper$$Lambda$2485/0x000000080169b440@23525adb from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, serverDescriptions=[ServerDescription{address=10.209.12.203:27017, type=UNKNOWN, state=CONNECTING, exception=

{com.mongodb.MongoSocketWriteException: Exception sending message}

, caused by {io.netty.channel.StacklessClosedChannelException}}]}. Waiting for 30000 ms before timing out"

Comment by 夕 一 [ 20/Jan/22 ]


Comment by 夕 一 [ 20/Jan/22 ]

because the driver dosn't print any log, so i can't provider any evidence

Comment by 夕 一 [ 20/Jan/22 ]

i'm using reactive stream driver from 4.0.1 to 4.4.1 the error log always print, proof the issure is always exist

Comment by Jeffrey Yemin [ 20/Jan/22 ]

Hi there, thank you for reaching out.  From your initial description, I don't see any evidence of a bug in the driver. As this seems more like a support issue, I wanted to give you some resources to get this question answered more quickly:

  • Our MongoDB support portal, located at support.mongodb.com
  • Our MongoDB community portal, located here
  • If you are an Atlas customer, you can review your support options by clicking Support in the top menu bar of the Atlas UI

 

Comment by 夕 一 [ 20/Jan/22 ]

i found the screenshot is part of my code. so i comment my source code

Comment by 夕 一 [ 20/Jan/22 ]

import arrow.core.Either
import arrow.core.None
import arrow.core.Option
import arrow.core.getOrElse
import cn.dbn.userCenter.sphere.shared.IMongoManager
import com.mongodb.client.model.UpdateOptions
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import com.mongodb.reactivestreams.client.MongoDatabase
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.*
import org.bson.Document
import org.bson.conversions.Bson
import org.slf4j.LoggerFactory
 
/**
 * Created by petzold on 2016/11/7.
 */
class MongoManager(private val client: MongoClient, private val database: String): IMongoManager {
    private val logger = LoggerFactory.getLogger(MongoManager::class.java)
 
    override suspend fun <T> queryOption(func: suspend MongoDatabase.() -> Option<T>): Option<T> = try {
        func(client.getDatabase(database))
    } catch (e: Exception) {
        logger.error("请求Mongo出现异常", e)
        None
    }
 
    override suspend fun <T> query(collection: String, fn: suspend MongoCollection<Document>.() -> T): Either<Throwable, T> = try {
        Either.Right(fn(client.getDatabase(database).getCollection(collection)))
    } catch (e: Throwable) {
        logger.error("请求Mongo出现异常", e)
        Either.Left(e)
    }
 
    override suspend fun <T, R> query(collection: String, t: T, fn: suspend MongoCollection<Document>.(T) -> R): Either<Throwable, R> = try {
        Either.Right(fn(client.getDatabase(database).getCollection(collection), t))
    } catch (e: Throwable) {
        logger.error("请求Mongo出现异常", e)
        Either.Left(e)
    }
 
    override suspend fun <T> find(collection: String, func: suspend Document.() -> T): Either<Throwable, List<T>> = query(collection) {
        this.find().asFlow().map(func).toList()
    }
 
    override suspend fun <T> find(collection: String, query: Bson, projection: Bson, sort: Bson, func: suspend Document.() -> T): Either<Throwable, List<T>> = query(collection, query){
        this.find(it).projection(projection).sort(sort).asFlow().map(func).toList()
    }
 
    override suspend fun <T> find(collection: String, query: Bson, projection: Bson, func: suspend Document.() -> T): Either<Throwable, List<T>> = query(collection, query){
        this.find(it).projection(projection).asFlow().map(func).toList()
    }
 
    override suspend fun <T> find(collection: String, query: Bson, func: suspend Document.() -> T): Either<Throwable, List<T>> = query(collection, query){
        this.find(it).asFlow().map(func).toList()
    }
 
    override suspend fun <T> findOne(collection: String, query: Bson, projection: Bson, func: suspend Document.() -> T?): Option<T> = queryOption{
        Option.fromNullable(this.getCollection(collection).find(query).projection(projection).awaitFirstOrNull()).flatMap { Option.fromNullable(func(it)) }
    }
 
    override suspend fun <T> findOne(collection: String, query: Bson, func: suspend Document.() -> T?): Option<T> = queryOption {
        Option.fromNullable(this.getCollection(collection).find(query).awaitFirstOrNull()).flatMap { Option.fromNullable(func(it)) }
    }
 
    override suspend fun exist(collection: String, query: Bson): Boolean = query(collection, query) {
        countDocuments(it).awaitSingle() >= 1L
    }.getOrElse { false }
 
    override suspend fun exec(collection: String, fn: suspend MongoCollection<Document>.() -> Unit) {
        try {
            val db = client.getDatabase(database)
            fn(db.getCollection(collection))
        } catch (e: Exception) {
            logger.error("请求Mongo出现异常", e)
        }
    }
 
    override suspend fun <T> exec(collection: String, t: T, fn: suspend MongoCollection<Document>.(T) -> Unit) {
        try {
            fn(client.getDatabase(database).getCollection(collection), t)
        } catch (e: Exception) {
            logger.error("请求Mongo出现异常", e)
        }
    }
 
    override suspend fun <T1, T2> exec(collection: String, t1: T1, t2: T2, fn: suspend MongoCollection<Document>.(T1, T2) -> Unit) {
        try {
            val db = client.getDatabase(database)
            fn(db.getCollection(collection), t1, t2)
        } catch (e: Exception) {
            logger.error("请求Mongo出现异常", e)
        }
    }
 
    override suspend fun updateOne(collection: String, query: Bson, doc: Document) {
        exec(collection, Pair(query, doc)) {
            this.updateOne(it.first, Document("\$set", it.second)).awaitSingle()
        }
    }
 
    override suspend fun updateOne(collection: String, query: Bson, doc: Document, option: UpdateOptions) {
        exec(collection, Triple(query, doc, option)) {
            this.updateOne(it.first, Document("\$set", it.second), it.third).awaitSingle()
        }
    }
 
    override suspend fun updateMany(query: Bson, doc: Document, collection: String) {
        exec(collection, Pair(query, doc)){ this.updateMany(it.first, Document("\$set", it.second)).awaitLast() }
    }
 
    override suspend fun insertOne(collection: String, doc: Document) {
        exec(collection, doc) {
            this.insertOne(it).awaitSingle()
        }
    }
 
    override suspend fun saveOne(collection: String, query: Bson, doc: Document) {
        exec(collection, Pair(query, doc)) {
            this.updateOne(it.first, Document("\$set", it.second), UpdateOptions().upsert(true)).awaitSingle()
        }
    }
}
 
 
private suspend fun getRecord(userId: Long): Option<Record> = mongoManager.findOne("user_identity", Filters.eq("_id", userId.toString())) {
        val ls = this.mapList("records") {
            val uids = this.mapList("uid") { Uid(this.getString("id"), this.getLong("time"), this.getString("date")) }
            Log(this.getString("id").toLong(), this.getString("l3id")?.toLong() ?: 0L, this.getLong("time"), uids)
        }
        Record(this.getString("current").toLong(), this.getString("date"), ls)
    }
 

Generated at Thu Feb 08 09:02:06 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.