21 Mar 2012

I have messages on a queue but they don't get dispatched to connected consumer.



Another somewhat tricky lesson learned on ActiveMQ.

Suppose you have a network of two broker instances.
Your JMS clients (i.e. producers and consumers) kind of randomly connect to one of the two brokers at runtime. That way, load is distributed across both brokers.

Because of consumers connecting to a broker randomly you're smart and configure for replayWhenNoConsumers using the following policy configuration


<policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb">
  <networkBridgeFilterFactory>
    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
  <networkBridgeFilterFactory>
<policyEntry>


By using replayWhenNoConsumers messages that were passed along the broker network can be replayed back to the original broker in case a broker has no consumers connected. This is a good thing to do when your consumers randomly pick a broker at runtime and when the number of consumers per destination is rather low. 


Now at runtime you may kind of irregularly observe that on one broker instance a particular queue has a QueueSize greater than 0 and a consumer connected but the messages do not get dispatched to the connected consumer.

In addition when you try to browse the queue using either the ActiveMQ web console or jconsole, no messages are shown. The browse command simply returns an empty set of messages.

Restarting the consumer does not help for these messages to be dispatched. Restarting the broker however does help and all remaining messages get dispatched correctly. 



What's going on?

A broker by default has duplicate detection turned on. This is to prevent a producer from sending the same message twice within a small time window (e.g. due to a network fault and producer reconnect). Its the combination of duplicated detection and replayWhenNoConsumers that is causing this behavior.


Lets use a little example to illustrate this in more detail:

The broker network consists of two broker instances, brokerA and brokerB.

Consider a producer connecting to brokerA and sending a few messages to queue1. Slightly later a consumer connects to brokerB. BrokerA forwards all the messages on queue1 to brokerB. BrokerB dispatches the messages to the connected consumer.

Lets assume the consumer disconnects from brokerB before it has processed all messages. Shortly thereafter the consumer (or a new consumer) connects to brokerA again ready to process any messages on queue1. BrokerA itself does not have any messages on queue1 stored locally, as it passed them all to brokerB.

However thanks to the replayWhenNoConsumers policy configuration, brokerB will replay all messages on queue1 back to brokerA again. Without setting replayWhenNoConsumers messages would not be replayed back to brokerA and instead be stuck on brokerB until a consumer reconnects to brokerB and consumes these messages.

As these messages are replayed back to brokerA within a short time frame, the duplicate detection in the brokers cursor on brokerA detects these messages as duplicates. brokerA has initially received these message (with the same message id from the connected producer). Because they are seen as duplicates, they won't get dispatched to the consumer but these messages are already enqueued on the queue.

You can't browse these messages either because a queue browser is also a JMS client from a brokers point of view and the cursor dispatches the message to a queue browser as well.


The solution is to disable duplicate detection in the cursor


<policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb" enableAudit="false" >
  <networkBridgeFilterFactory>
    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
  <networkBridgeFilterFactory>
<policyEntry>


Disabling duplicate detection will not mark those replayed messages as duplicates. So they get dispatched correctly to any local consumers. The broker additionally has producer side duplicate detection turned on by default in the persistence adapter so that duplicate messages send by external producers due to a failover reconnect will still be detected!


When messages get replayed to a broker over the network bridge, these messages are not duplicates. So disabling duplicate detection will in general not cause any duplicate messages over the bridge. Only "in general", because when the network bridge between the two broker instances dies before a msg got acked, the bridge gets re-established later and the same message would be resent. So in case the bridge dies while messages are replayed back, we could potentially receive duplicate messages.

If that is a problem you can set auditNetworkProducers=true on the transport connector configuration to also enable producer side duplicate detection over the network bridge.


<transportConnectors>
  <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" auditNetworkProducers="true" />
transportConnectors>



This will detect those messages as duplicates as well. Setting auditNetworkProducers=true however is not recommended when bridging virtual destinations. And this property will only be exposed in the brokers XML configuration from ActiveMQ 5.6 onwards. Any versions before 5.6 cannot enable it.


Update, 23.10.2013:
We just found a bug in ActiveMQ versions 5.8.0 to 5.10-SNAPSHOT (but very likely also in older versions) where you could get into the situation described above despite of setting replayWhenNoConsumer=true and enableAudit=false. This bug is logged at ENTMQ-444 and has a JUnit test attached.


4 comments:

Anonymous said...

A million thanks to you as I was going crazy with this issue. My producer is stuck after sending message and not getting any reply back and finally timing out on the receive() call. Your solution of replayConsumers and auditenabled attributes helped to send the messages back to the origin broker where most of my consumers were listening in a random failover url. However, i need to perform more testing with this.

B o d h a y a n said...

I have set both the replayWhenNoConsumers="true" and enableAudit="false". But, still I see pending messages in the brokers. I have a network of six brokers. And JMS clients can connect to only four of them. I am also getting "Duplicate message add attempt rejected. Destination: ". Is there anything else I need to do?

Thanks,
Bodhayan.

Torsten Mielke said...

Hello Bodhayan,

Presuming your persistence adapter is kahadb and it has concurrentStoreAndDispatchQueues=true enabled by default I suggest to disable this, i.e. concurrentStoreAndDispatchQueues=false.

Just recently I noticed the same behavior while testing and resolved it by the above change.

Gary Tully said...

note: auditNetworkProducers=true only works when there is a single producer in the network. So the default(false) is sensible in most cases. Then duplicate suppression is left to the store or consumers audit.