Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-923

Storing Spring Integration Messages in mongoDB

    • Type: Icon: Bug Bug
    • Resolution: Done
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • Labels:
    • Environment:
      Spring Integration,

      I am adding Spring Integration messages with mongoTemplate.insert(Message m, "collection"). Message contains many different fields such as:

      {dispatchAsync=protected boolean org.apache.activemq.ActiveMQConnection.dispatchAsync, transportInterruptionProcessingComplete=protected volatile java.util.concurrent.CountDownLatch org.apache.activemq.ActiveMQConnection.transportInterruptionProcessingComplete, optimizedAckScheduledAckInterval=private long org.apache.activemq.ActiveMQConnection.optimizedAckScheduledAckInterval, nestedMapAndListEnabled=private boolean org.apache.activemq.ActiveMQConnection.nestedMapAndListEnabled, userSpecifiedClientID=private boolean org.apache.activemq.ActiveMQConnection.userSpecifiedClientID, exclusiveConsumer=private boolean org.apache.activemq.ActiveMQConnection.exclusiveConsumer, consumerIdGenerator=private final org.apache.activemq.util.LongSequenceGenerator org.apache.activemq.ActiveMQConnection.consumerIdGenerator, producerWindowSize=private int org.apache.activemq.ActiveMQConnection.producerWindowSize, info=private final org.apache.activemq.command.ConnectionInfo org.apache.activemq.ActiveMQConnection.info, connectionAudit=private final org.apache.activemq.ConnectionAudit org.apache.activemq.ActiveMQConnection.connectionAudit, tempDestinationIdGenerator=private final org.apache.activemq.util.LongSequenceGenerator org.apache.activemq.ActiveMQConnection.tempDestinationIdGenerator, disableTimeStampsByDefault=private boolean org.apache.activemq.ActiveMQConnection.disableTimeStampsByDefault, useDedicatedTaskRunner=private boolean org.apache.activemq.ActiveMQConnection.useDedicatedTaskRunner, optimizeAcknowledge=private boolean org.apache.activemq.ActiveMQConnection.optimizeAcknowledge, transformer=private org.apache.activemq.MessageTransformer org.apache.activemq.ActiveMQConnection.transformer, clientInternalExceptionListener=private org.apache.activemq.ClientInternalExceptionListener org.apache.activemq.ActiveMQConnection.clientInternalExceptionListener, producers=private final java.util.concurrent.ConcurrentHashMap org.apache.activemq.ActiveMQConnection.producers, advisoryConsumer=private org.apache.activemq.AdvisoryConsumer org.apache.activemq.ActiveMQConnection.advisoryConsumer, checkForDuplicates=private boolean org.apache.activemq.ActiveMQConnection.checkForDuplicates, transportFailed=private final java.util.concurrent.atomic.AtomicBoolean org.apache.activemq.ActiveMQConnection.transportFailed, scheduler=private org.apache.activemq.thread.Scheduler org.apache.activemq.ActiveMQConnection.scheduler, maxThreadPoolSize=private int org.apache.activemq.ActiveMQConnection.maxThreadPoolSize, transactedIndividualAck=private boolean org.apache.activemq.ActiveMQConnection.transactedIndividualAck, factoryStats=private final org.apache.activemq.management.JMSStatsImpl org.apache.activemq.ActiveMQConnection.factoryStats, timeCreated=private final long org.apache.activemq.ActiveMQConnection.timeCreated, queueOnlyConnection=private boolean org.apache.activemq.ActiveMQConnection.queueOnlyConnection, blobTransferPolicy=private org.apache.activemq.blob.BlobTransferPolicy org.apache.activemq.ActiveMQConnection.blobTransferPolicy, sendAcksAsync=private boolean org.apache.activemq.ActiveMQConnection.sendAcksAsync, producerIdGenerator=private final org.apache.activemq.util.LongSequenceGenerator org.apache.activemq.ActiveMQConnection.producerIdGenerator, consumerFailoverRedeliveryWaitPeriod=private long org.apache.activemq.ActiveMQConnection.consumerFailoverRedeliveryWaitPeriod, protocolVersion=private final java.util.concurrent.atomic.AtomicInteger org.apache.activemq.ActiveMQConnection.protocolVersion, sendTimeout=private int org.apache.activemq.ActiveMQConnection.sendTimeout, transportListeners=private final java.util.concurrent.CopyOnWriteArrayList org.apache.activemq.ActiveMQConnection.transportListeners, connectionSessionId=private final org.apache.activemq.command.SessionId org.apache.activemq.ActiveMQConnection.connectionSessionId, brokerInfoReceived=private final java.util.concurrent.CountDownLatch org.apache.activemq.ActiveMQConnection.brokerInfoReceived, optimizedMessageDispatch=private boolean org.apache.activemq.ActiveMQConnection.optimizedMessageDispatch, localTransactionIdGenerator=private final org.apache.activemq.util.LongSequenceGenerator org.apache.activemq.ActiveMQConnection.localTransactionIdGenerator, firstFailureError=private java.io.IOException org.apache.activemq.ActiveMQConnection.firstFailureError, stats=private final org.apache.activemq.management.JMSConnectionStatsImpl org.apache.activemq.ActiveMQConnection.stats, inputStreams=private final java.util.concurrent.CopyOnWriteArrayList org.apache.activemq.ActiveMQConnection.inputStreams, optimizeAcknowledgeTimeOut=private long org.apache.activemq.ActiveMQConnection.optimizeAcknowledgeTimeOut, objectMessageSerializationDefered=private boolean org.apache.activemq.ActiveMQConnection.objectMessageSerializationDefered, redeliveryPolicyMap=private org.apache.activemq.broker.region.policy.RedeliveryPolicyMap org.apache.activemq.ActiveMQConnection.redeliveryPolicyMap, nonBlockingRedelivery=private boolean org.apache.activemq.ActiveMQConnection.nonBlockingRedelivery, exceptionListener=private javax.jms.ExceptionListener org.apache.activemq.ActiveMQConnection.exceptionListener, clientIdGenerator=private final org.apache.activemq.util.IdGenerator org.apache.activemq.ActiveMQConnection.clientIdGenerator, copyMessageOnSend=private boolean org.apache.activemq.ActiveMQConnection.copyMessageOnSend, sessionTaskRunner=private org.apache.activemq.thread.TaskRunnerFactory org.apache.activemq.ActiveMQConnection.sessionTaskRunner, outputStreams=private final java.util.concurrent.CopyOnWriteArrayList org.apache.activemq.ActiveMQConnection.outputStreams, rejectedTaskHandler=private java.util.concurrent.RejectedExecutionHandler org.apache.activemq.ActiveMQConnection.rejectedTaskHandler, clientIDSet=private boolean org.apache.activemq.ActiveMQConnection.clientIDSet, transport=private final org.apache.activemq.transport.Transport org.apache.activemq.ActiveMQConnection.transport, useCompression=private boolean org.apache.activemq.ActiveMQConnection.useCompression, dispatchers=private final java.util.concurrent.ConcurrentHashMap org.apache.activemq.ActiveMQConnection.dispatchers, closed=private final java.util.concurrent.atomic.AtomicBoolean org.apache.activemq.ActiveMQConnection.closed, warnAboutUnstartedConnectionTimeout=private long org.apache.activemq.ActiveMQConnection.warnAboutUnstartedConnectionTimeout, useRetroactiveConsumer=private boolean org.apache.activemq.ActiveMQConnection.useRetroactiveConsumer, sessions=private final java.util.concurrent.CopyOnWriteArrayList org.apache.activemq.ActiveMQConnection.sessions, isConnectionInfoSentToBroker=private boolean org.apache.activemq.ActiveMQConnection.isConnectionInfoSentToBroker, closeTimeout=private int org.apache.activemq.ActiveMQConnection.closeTimeout, watchTopicAdvisories=private boolean org.apache.activemq.ActiveMQConnection.watchTopicAdvisories, alwaysSyncSend=private boolean org.apache.activemq.ActiveMQConnection.alwaysSyncSend, executor=private final java.util.concurrent.ThreadPoolExecutor org.apache.activemq.ActiveMQConnection.executor, connectionConsumers=private final java.util.concurrent.CopyOnWriteArrayList org.apache.activemq.ActiveMQConnection.connectionConsumers, ensureConnectionInfoSentMutex=private final java.lang.Object org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSentMutex, prefetchPolicy=private org.apache.activemq.ActiveMQPrefetchPolicy org.apache.activemq.ActiveMQConnection.prefetchPolicy, brokerInfo=private org.apache.activemq.command.BrokerInfo org.apache.activemq.ActiveMQConnection.brokerInfo, activeTempDestinations=public final java.util.concurrent.ConcurrentHashMap org.apache.activemq.ActiveMQConnection.activeTempDestinations, messagePrioritySupported=private boolean org.apache.activemq.ActiveMQConnection.messagePrioritySupported, destinationSource=private org.apache.activemq.advisory.DestinationSource org.apache.activemq.ActiveMQConnection.destinationSource, started=private final java.util.concurrent.atomic.AtomicBoolean org.apache.activemq.ActiveMQConnection.started, sessionIdGenerator=private final org.apache.activemq.util.LongSequenceGenerator org.apache.activemq.ActiveMQConnection.sessionIdGenerator, useAsyncSend=private boolean org.apache.activemq.ActiveMQConnection.useAsyncSend, alwaysSessionAsync=protected boolean org.apache.activemq.ActiveMQConnection.alwaysSessionAsync, closing=private final java.util.concurrent.atomic.AtomicBoolean org.apache.activemq.ActiveMQConnection.closing}

      During debbuging I am entering MappingMongoConverter. For the sessionTaskRunner field Object propertyObj = wrapper.getProperty(prop, prop.getType(), fieldAccessOnly) returns org.apache.activemq.thread.TaskRunnerFactory and then MessageTransformationException is thrown.

      This is the stacktrace:
      13.08.2013 16:27:52.782 [AbstractMessageListenerContainer.java] [DEBUG] org.springframework.jms.listener.DefaultMessageListenerContainer#0-1 Listener exception after container shutdown
      org.springframework.integration.dispatcher.AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed. Multiple causes:
      failed to transform message
      java.lang.NullPointerException
      See below for the stacktrace of the first cause.
      at org.springframework.integration.dispatcher.UnicastingDispatcher.handleExceptions(UnicastingDispatcher.java:165) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:128) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.core.MessagingTemplate.convertAndSend(MessagingTemplate.java:189) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:183) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:423) ~[spring-integration-jms-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:277) ~[spring-integration-jms-2.2.4.RELEASE.jar:na]
      at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:537) ~[spring-jms-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:497) ~[spring-jms-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468) ~[spring-jms-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325) [spring-jms-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263) [spring-jms-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1096) [spring-jms-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1088) [spring-jms-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:985) [spring-jms-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at java.lang.Thread.run(Thread.java:662) [na:1.6.0_38]
      Caused by: org.springframework.integration.transformer.MessageTransformationException: failed to transform message
      at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:67) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:137) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      ... 18 common frames omitted
      Caused by: java.lang.IllegalArgumentException: Object of class [java.util.concurrent.atomic.AtomicLong] must be an instance of class org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageWrapper
      at org.springframework.util.Assert.isInstanceOf(Assert.java:337) ~[spring-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at org.springframework.util.Assert.isInstanceOf(Assert.java:319) ~[spring-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
      at org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageReadingMongoConverter.write(MongoDbMessageStore.java:318) ~[spring-integration-mongodb-2.2.4.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.convertToMongoType(MappingMongoConverter.java:867) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.AbstractMongoConverter.convertToMongoType(AbstractMongoConverter.java:101) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.QueryMapper.convertId(QueryMapper.java:265) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writeInternal(MappingMongoConverter.java:357) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writePropertyInternal(MappingMongoConverter.java:442) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter$3.doWithPersistentProperty(MappingMongoConverter.java:376) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter$3.doWithPersistentProperty(MappingMongoConverter.java:363) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mapping.model.BasicPersistentEntity.doWithProperties(BasicPersistentEntity.java:241) ~[spring-data-commons-1.5.2.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writeInternal(MappingMongoConverter.java:363) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writePropertyInternal(MappingMongoConverter.java:442) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter$3.doWithPersistentProperty(MappingMongoConverter.java:376) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter$3.doWithPersistentProperty(MappingMongoConverter.java:363) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mapping.model.BasicPersistentEntity.doWithProperties(BasicPersistentEntity.java:241) ~[spring-data-commons-1.5.2.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writeInternal(MappingMongoConverter.java:363) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writePropertyInternal(MappingMongoConverter.java:442) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter$3.doWithPersistentProperty(MappingMongoConverter.java:376) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter$3.doWithPersistentProperty(MappingMongoConverter.java:363) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mapping.model.BasicPersistentEntity.doWithProperties(BasicPersistentEntity.java:241) ~[spring-data-commons-1.5.2.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writeInternal(MappingMongoConverter.java:363) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writeInternal(MappingMongoConverter.java:334) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.convert.MappingMongoConverter.write(MappingMongoConverter.java:299) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageReadingMongoConverter.write(MongoDbMessageStore.java:322) ~[spring-integration-mongodb-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageReadingMongoConverter.write(MongoDbMessageStore.java:299) ~[spring-integration-mongodb-2.2.4.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.MongoTemplate.doInsert(MongoTemplate.java:656) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.data.mongodb.core.MongoTemplate.insert(MongoTemplate.java:613) ~[spring-data-mongodb-1.2.3.RELEASE.jar:na]
      at org.springframework.integration.mongodb.store.MongoDbMessageStore.addMessage(MongoDbMessageStore.java:128) ~[spring-integration-mongodb-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.transformer.ClaimCheckInTransformer.doTransform(ClaimCheckInTransformer.java:50) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33) ~[spring-integration-core-2.2.4.RELEASE.jar:na]
      ... 24 common frames omitted

            Assignee:
            Unassigned Unassigned
            Reporter:
            lmiroslaw Lukasz Miroslaw
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: