[JAVA-5167] kotlinx.datetime.instant & time series collections Created: 18/Sep/23  Updated: 30/Oct/23  Resolved: 30/Oct/23

Status: Closed
Project: Java Driver
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Question Priority: Unknown
Reporter: Carl Holwell Assignee: Ross Lawley
Resolution: Works as Designed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Duplicate
is duplicated by JAVA-5218 kotlinx.datetime instant & time serie... Closed
Problem/Incident
is caused by JAVA-5218 kotlinx.datetime instant & time serie... Closed
Documentation Changes Summary:

1. What would you like to communicate to the user about this feature?
2. Would you like the user to see examples of the syntax and/or executable code and its output?
3. Which versions of the driver/connector does this apply to?


 Description   

Summary

Hello, I am trying to write a data class to a time series collection where the `timeField` has name 'timestamp' and is of type:

kotlinx.datetime.instant

 

I have created a codec which simply encodes 

writer.writeDateTime(value.toEpochMilliseconds())

however in the logs I see the resulting document:

"documents": [{"timestamp": "2023-09-18T22:36:28Z", ...}]

accompanied by the following error message:

'timestamp' must be present and contain a valid BSON UTC datetime value

In fact, the only way I can get this to work is if I use kotlinx.serialization and provide a java instant!

I don't know if there is a driver issue here, but certainly this looks like it should work? If there is a mistake here I think it would be great if the documentation could provide a more comprehensive example!

 

Driver Version

4.10.1



 Comments   
Comment by Ross Lawley [ 30/Oct/23 ]

Hi dev@noisysoftware.com,

That makes sense. As bson.kotlinx is an optional dependency it already is opt-in; firstly by including the library on the class path and secondly by decorating data classes as @Serializable.

As kotlinx serialization is an alternative serialization framework top codecs, once using kotlinx serialization there is no way to break out and use codecs for some fields / properties.

The fix in this scenario would be to use a custom serializer for the field to ensure it is encoded to the correct bson type.

All the best,

Ross

Comment by Carl Holwell [ 27/Oct/23 ]

JIRA not being friendly, couldn't reopen the old issue and the description I added to the duplicate got replaced by the template which I now can't edit
 
I discovered what was causing the issue in the linked report
 
A transitive dependency on `bson.kotlinx` was causing it to try use kotlinx serialization rather than the codec I provided...
 
This all seems fairly opaque, would it be better for the serialization to be explicit opt in?

Comment by PM Bot [ 18/Oct/23 ]

There hasn't been any recent activity on this ticket, so we're resolving it. Thanks for reaching out! Please feel free to reopen this ticket if you're still experiencing the issue, and add a comment if you're able to provide more information.

Comment by PM Bot [ 10/Oct/23 ]

Hi dev@noisysoftware.com! JAVA-5167 is awaiting your response.

If this is still an issue for you, please open Jira to review the latest status and provide your feedback. Thanks!

Comment by Ross Lawley [ 03/Oct/23 ]

Hi dev@noisysoftware.com,

This is strange, especially as the previous example works. All I can suggest is to ensure that the values are expected.

One way to do that would be to look at the extended json produced by the class:

println(BsonDocumentWrapper.asBsonDocument(leq, codecRegistry).toJson())

That may provide more insight.

Ross

Comment by Carl Holwell [ 26/Sep/23 ]

Initialisation:

single<MongoClient> {
    val config = get<ApplicationConfig>()
    MongoClient.create("mongodb://${config.mongoInstanceHost}:${config.mongoInstancePort}")
}
 
single<MongoDatabase> {
    val config = get<ApplicationConfig>()
    get<MongoClient>().getDatabase(config.mongoDatabase).withCodecRegistry(
        CodecRegistries.fromRegistries(
            CodecRegistries.fromCodecs(InstantCodec()),
            MongoClientSettings.getDefaultCodecRegistry()
        )
    )
} 

 

 

insert class:

@Serializable
data class Leq(
    val timestamp: Instant,
    @Contextual val _clientId: ObjectId,
    @Contextual val _soundLevelMeterId: ObjectId,
    val value: Double,
    val weighting: Weighting
)
 

 

usage:

 

class MongoLeqDatabase(
    private val db: MongoDatabase,
    private val dispatchers: DispatcherProvider,
    private val log: Logger
) : LeqDatabase {
 
    private fun getLeqCollection(_soundLevelMeterId: String): MongoCollection<Leq> = db.getCollection<Leq>(
        collectionName = "leqs_$_soundLevelMeterId"
    )
    
    override suspend fun save(leq: Leq) {
        dispatchers.io.run {
            //todo not working https://jira.mongodb.org/browse/JAVA-5167
            getLeqCollection(_soundLevelMeterId = leq._soundLevelMeterId.toHexString()).insertOne(document = leq)
        }
    }
 
    override suspend fun initialise(_soundLevelMeterId: ObjectId) {
        dispatchers.io.run {
            if (db.listCollectionNames().toList().contains("leqs_$_soundLevelMeterId")) return@run
            log.info("Creating collection leqs_$_soundLevelMeterId")
            db.createCollection(collectionName = "leqs_$_soundLevelMeterId", createCollectionOptions = LeqTimeSeriesCollectionOptions)
        }
    }
 
    companion object {
        private val LeqTimeSeriesCollectionOptions = CreateCollectionOptions()
            .timeSeriesOptions(TimeSeriesOptions("timestamp").metaField("_soundLevelMeterId").granularity(SECONDS))
            .expireAfter(90, TimeUnit.DAYS)
    }
}
 

 

Please let me know if there is anything else I can do / provide to help.

Comment by Carl Holwell [ 26/Sep/23 ]

Stack trace:

Exception in thread "DefaultDispatcher-worker-5" com.mongodb.MongoWriteException: Write operation error on server 127.0.0.1:27017. Write error: WriteError{code=2, message=''timestamp' must be present and contain a valid BSON UTC datetime value', details={}}.	at com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.lambda$createSingleWriteRequestMono$24(MongoOperationPublisher.java:450)	at reactor.core.publisher.Mono.lambda$onErrorMap$27(Mono.java:3749)	at reactor.core.publisher.Mono.lambda$onErrorResume$29(Mono.java:3839)	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)	at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)	at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)	at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)	at com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.lambda$sinkToCallback$30(MongoOperationPublisher.java:498)	at com.mongodb.reactivestreams.client.internal.OperationExecutorImpl.lambda$execute$9(OperationExecutorImpl.java:126)	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47)	at com.mongodb.internal.operation.CommandOperationHelper.lambda$exceptionTransformingCallback$27(CommandOperationHelper.java:648)	at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:97)	at com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier$RetryingCallback.onResult(RetryingAsyncCallbackSupplier.java:111)	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47)	at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:97)	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47)	at com.mongodb.internal.async.function.AsyncCallbackSupplier.lambda$whenComplete$1(AsyncCallbackSupplier.java:97)	at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$executeBulkWriteBatchAsync$9(MixedBulkWriteOperation.java:349)	at com.mongodb.internal.async.function.AsyncCallbackLoop$LoopingCallback.onResult(AsyncCallbackLoop.java:85)	at com.mongodb.internal.async.function.AsyncCallbackLoop$LoopingCallback.onResult(AsyncCallbackLoop.java:62)	at com.mongodb.internal.async.function.LoopState.breakAndCompleteIf(LoopState.java:113)	at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$executeBulkWriteBatchAsync$8(MixedBulkWriteOperation.java:300)	at com.mongodb.internal.async.function.AsyncCallbackLoop$LoopingCallback.onResult(AsyncCallbackLoop.java:83)	at com.mongodb.internal.async.function.AsyncCallbackLoop$LoopingCallback.onResult(AsyncCallbackLoop.java:62)	at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$executeBulkWriteBatchAsync$7(MixedBulkWriteOperation.java:322)	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47)	at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.lambda$executeAsync$0(DefaultServer.java:237)	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47)	at com.mongodb.internal.connection.CommandProtocolImpl.lambda$executeAsync$0(CommandProtocolImpl.java:88)	at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.lambda$sendAndReceiveAsync$1(DefaultConnectionPool.java:756)	at com.mongodb.internal.connection.UsageTrackingInternalConnection.lambda$sendAndReceiveAsync$1(UsageTrackingInternalConnection.java:155)	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:47)	at com.mongodb.internal.connection.InternalStreamConnection.lambda$sendCommandMessageAsync$0(InternalStreamConnection.java:553)	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:847)	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:810)	at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:669)	at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:666)	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:251)	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:234)	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129)	at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:160)	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:573)	at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276)	at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297)	at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:147)	at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:119)	at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:108)	at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:666)	at com.mongodb.internal.connection.InternalStreamConnection.access$600(InternalStreamConnection.java:96)	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:800)	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:784)	at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:669)	at com.mongodb.internal.connection.InternalStreamConnection$3.completed(InternalStreamConnection.java:666)	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:251)	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:234)	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129)	at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:221)	at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113)	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)	at java.base/java.lang.Thread.run(Thread.java:833)	Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@16dc878a, Dispatchers.Default] 

Comment by Carl Holwell [ 26/Sep/23 ]

import com.mongodb.MongoClientSettings
import com.mongodb.client.model.CreateCollectionOptions
import com.mongodb.client.model.TimeSeriesGranularity
import com.mongodb.client.model.TimeSeriesOptions
import com.mongodb.kotlin.client.coroutine.MongoClient
import kotlinx.coroutines.runBlocking
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.serialization.Contextual
import kotlinx.serialization.Serializable
import org.bson.BsonReader
import org.bson.BsonWriter
import org.bson.codecs.Codec
import org.bson.codecs.DecoderContext
import org.bson.codecs.EncoderContext
import org.bson.codecs.configuration.CodecRegistries
import org.bson.types.ObjectId
 
class InstantCodec: Codec<Instant> {
 
    override fun encode(writer: BsonWriter, value: Instant, encoderContext: EncoderContext) {
        writer.writeDateTime(value.toEpochMilliseconds())
    }
 
    override fun getEncoderClass(): Class<Instant> = Instant::class.java
 
    override fun decode(reader: BsonReader, decoderContext: DecoderContext): Instant =
        Instant.fromEpochMilliseconds(reader.readDateTime())
}
 
@Serializable
data class Foo(
    val timestamp: Instant,
    @Contextual val refId: ObjectId,
    val value: Double,
)
 
runBlocking {
    val mongoClient = MongoClient.create("mongodb://127.0.0.1:27017")
    val mongoDatabase = mongoClient.getDatabase("database").withCodecRegistry(
        CodecRegistries.fromRegistries(
            CodecRegistries.fromCodecs(InstantCodec()),
            MongoClientSettings.getDefaultCodecRegistry()
        )
    )
    val createCollectionOptions = CreateCollectionOptions().timeSeriesOptions(TimeSeriesOptions("timestamp").metaField("refId").granularity(TimeSeriesGranularity.SECONDS))
    mongoDatabase.createCollection("ts", createCollectionOptions)
    val collection = mongoDatabase.getCollection<Foo>("ts")
 
    collection.insertOne(Foo(Clock.System.now(), ObjectId(), 1.0))
}
 

The above is a representative example, however confusingly this works but identical code in my project does not.

I am not depending on `bson-kotlinx` explicitly but at a guess the codec for serializable types is being used via transitive dependency?

 

Edit: I have just tried mapping to a non serialisable equivalent and it still does not work in my application

Comment by Ross Lawley [ 26/Sep/23 ]

Hi dev@noisysoftware.com,

The codec looks fine - its unclear as to the cause of the exception without a stacktrace or a larger reproducible example.

Ross

Comment by Carl Holwell [ 26/Sep/23 ]

Sorry I should have been more clear, I created a codec as follows:

private class InstantCodec: Codec<Instant> {
    
    override fun encode(writer: BsonWriter, value: Instant, encoderContext: EncoderContext) {
        writer.writeDateTime(value.toEpochMilliseconds())
    }
 
    override fun getEncoderClass(): Class<Instant> = Instant::class.java
 
    override fun decode(reader: BsonReader, decoderContext: DecoderContext): Instant =
        Instant.fromEpochMilliseconds(reader.readDateTime())
} 

which is what I was using to reproduce the problem.

Comment by Ross Lawley [ 26/Sep/23 ]

Hi dev@noisysoftware.com,

Unfortunately, kotlinx.datetime.instant is a separate dependency so there isn't any default support in the driver. The best approach would be a custom codec similiar to InstantCodec

Ross

Comment by Carl Holwell [ 26/Sep/23 ]

Hi ross@mongodb.com 

I am not using `bson-kotlinx`

Happy to provide more code but would rather not post on JIRA, is there another way I can share it? 

Thanks

Comment by Ross Lawley [ 26/Sep/23 ]

Hi dev@noisysoftware.com,

Are you using bson-kotlinx for serialization? It maybe that extra work is required to add support for this type, although I'm unsure what the difference is to a Java Instant (perhaps availabilty for Kotlin Multiplatform).

Not sure about the codec issue - would need to see more code inorder to help diagnose. Do you have any code you can share?

Ross

Comment by PM Bot [ 18/Sep/23 ]

Hi dev@noisysoftware.com, thank you for reporting this issue! The team will look into it and get back to you soon.

Generated at Thu Feb 08 09:03:55 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.