Introduction
ActiveMQ uses prefetch to batch messages to consumers for efficiency. When a consumer can't process its prefetch buffer fast enough, ActiveMQ marks it as a "slow consumer" and may reduce prefetch or warn about backlog buildup, affecting overall queue performance.
Symptoms
Slow consumer warning:
```bash $ tail -f /var/log/activemq/activemq.log
[WARN] Queue://orders.queue - Consumer Consumer:1 has been slow for 30000ms [WARN] Slow consumer detected: prefetch not being consumed ```
Queue backlog:
[WARN] Queue depth exceeded 10000 messages due to slow consumer
[INFO] Prefetch for consumer Consumer:1 reduced from 1000 to 100Consumer blocked:
[ERROR] Consumer blocked waiting for acknowledge
[WARN] Message backlog growing, producers may be blockedCommon Causes
- 1.Prefetch too high - Consumer gets more messages than it can handle
- 2.Message processing slow - Complex business logic or database calls
- 3.Consumer resource limits - CPU, memory, or thread pool exhausted
- 4.Network latency - Slow network between broker and consumer
- 5.Blocking operations - Consumer does blocking I/O in message handler
- 6.Single-threaded consumer - Can't process concurrent messages
Step-by-Step Fix
Step 1: Identify Slow Consumers
```bash # List consumers for a queue curl -u admin:admin http://localhost:8161/api/jolokia/list/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue,consumer=*
# Check consumer statistics curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/Consumers
# Via activemq-admin activemq-admin query --objname type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue --attribute ConsumerCount,QueueSize
# Check prefetch status curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue,consumer=Consumer:1/PrefetchSize ```
Step 2: Reduce Prefetch Size
```java // Set prefetch in connection factory ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.getPrefetchPolicy().setQueuePrefetch(10); // Reduce from default 1000
// Or in connection URI ConnectionFactory factory = new ActiveMQConnectionFactory( "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=10" );
// For specific consumer Queue queue = session.createQueue("orders.queue?consumer.prefetchSize=10"); MessageConsumer consumer = session.createConsumer(queue);
// Recommended prefetch values based on message processing time: // < 10ms per message: prefetch 100-1000 // 10-100ms per message: prefetch 10-50 // > 100ms per message: prefetch 1-10 ```
Step 3: Configure Consumer Priority
```xml <!-- In activemq.xml, configure strict priority dispatch --> <policyEntry queue="orders.queue" prioritizedMessages="true" useConsumerPriority="true" strictOrderDispatch="true"> <dispatchPolicy> <strictOrderDispatchPolicy/> </dispatchPolicy> </policyEntry>
<!-- Consumers with higher priority get messages first --> <!-- Allows fast consumers to process while slow ones catch up --> ```
```java // Set consumer priority Queue queue = session.createQueue("orders.queue?consumer.priority=10"); MessageConsumer consumer = session.createConsumer(queue);
// Priority ranges from 0-127 // Higher priority = more messages dispatched ```
Step 4: Optimize Message Processing
```java // Before: Slow single-threaded processing MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(message -> { processMessage(message); // Blocking, slow operation session.commit(); });
// After: Parallel processing with thread pool ExecutorService executor = Executors.newFixedThreadPool(10); consumer.setMessageListener(message -> { executor.submit(() -> { processMessage(message); session.commit(); }); });
// Or use Spring's DefaultMessageListenerContainer @Bean public DefaultMessageListenerContainer jmsContainer() { DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setDestinationName("orders.queue"); container.setConcurrentConsumers(10); // Multiple concurrent consumers container.setMaxConcurrentConsumers(20); container.setSessionTransacted(true); return container; } ```
Step 5: Check Consumer Resource Usage
```bash # Check consumer process CPU top -p <consumer_pid> ps -p <consumer_pid> -o %cpu,%mem,vsz,rss
# Check thread count ps -eLf | grep <consumer_pid> | wc -l
# Check open connections lsof -p <consumer_pid> | grep TCP
# Check JVM memory (for Java consumers) jstat -gc <consumer_pid> jcmd <consumer_pid> GC.heap_info
# If resources exhausted, scale consumers or optimize processing ```
Step 6: Use Async Message Processing
```java // Use async acknowledgment Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Or client acknowledgment for explicit control Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); Message message = consumer.receive(); // Process message asynchronously executor.submit(() -> { processMessage(message); message.acknowledge(); // Acknowledge after processing }); ```
Step 7: Configure Abort Slow Consumers
```xml <!-- In activemq.xml, configure slow consumer handling --> <policyEntry queue=">" abortSlowConsumerThreshold="30000" abortSlowConsumerStrategy="abort"> <slowConsumerStrategy> <abortSlowConsumerStrategy abortConnection="true"/> </slowConsumerStrategy> </policyEntry>
<!-- abortSlowConsumerThreshold: Time in ms before aborting --> <!-- abortConnection: Also close consumer connection -->
<!-- Alternative: Reduce prefetch for slow consumers --> <policyEntry queue=">" reduceSlowConsumerPrefetch="true" slowConsumerPrefetchReductionFactor="0.5"> <slowConsumerStrategy> <prefetchSlowConsumerStrategy/> </slowConsumerStrategy> </policyEntry> ```
Step 8: Monitor Consumer Lag
```bash # Check dispatch queue size curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue,consumer=Consumer:1/DispatchedQueueSize
# Check dequeued vs acknowledged ratio curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/DequeueCount curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue,consumer=Consumer:1/AcknowledgeCount
# If dequeued >> acknowledged, consumer is slow
# Monitor processing time # Add timing to consumer code: long start = System.currentTimeMillis(); processMessage(message); long elapsed = System.currentTimeMillis() - start; log.info("Processing time: {} ms", elapsed); ```
Step 9: Scale Consumer Count
```bash # Check current consumer count curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/ConsumerCount
# Add more consumers # Java: Use concurrent consumers DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConcurrentConsumers(5); container.setMaxConcurrentConsumers(20);
# Or deploy multiple consumer instances # Kubernetes: Scale consumer deployment kubectl scale deployment order-consumer --replicas=10
# Ensure consumers share same queue for load balancing ```
Step 10: Check Network Performance
```bash # Test network latency to broker ping activemq-host
# Check TCP connection stats netstat -i
# Measure message throughput # On broker: activemq-admin query --objname type=Broker --attribute TotalMessageCount,TotalDequeueCount
# Monitor connection bandwidth iftop -i eth0
# If high latency, increase prefetch and reduce acknowledgments # Or move consumers closer to broker ```
Consumer Performance Settings
| Setting | Default | Slow Consumer Recommended |
|---|---|---|
| prefetchPolicy.queuePrefetch | 1000 | 10-100 |
| concurrentConsumers | 1 | 5-20 |
| abortSlowConsumerThreshold | 0 (disabled) | 30000ms |
| consumer.priority | 0 | Higher for fast consumers |
Verification
```bash # After configuration changes # Restart consumers
# Check consumer dispatch rate curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/DequeueCount
# Watch for slow consumer warnings tail -f /var/log/activemq/activemq.log | grep -i "slow consumer"
# Should show no warnings
# Check queue depth curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/QueueSize
# Should be stable or decreasing
# Monitor consumer statistics activemq-admin query --objname type=Broker,destinationType=Queue --attribute QueueSize,ConsumerCount,DequeueCount ```
Prevention
To prevent ActiveMQ slow consumer issues from recurring, implement these proactive measures:
1. Monitor Consumer Performance
groups:
- name: activemq-consumers
rules:
- alert: ActiveMQSlowConsumer
expr: |
activemq_queue_consumer_dispatch_rate < 10
for: 5m
labels:
severity: warning
annotations:
summary: "ActiveMQ consumer dispatch rate below 10 msg/s"2. Configure Abort Slow Consumer
<!-- activemq.xml -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">"
slowConsumerThreshold="30000"
slowConsumerCheckPeriod="60000"
abortSlowConsumer="true">
<slowConsumerStrategy>
<abortSlowConsumerStrategy abortConnection="false"/>
</slowConsumerStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>3. Optimize Prefetch Settings
<!-- ActiveMQ connection factory configuration -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="prefetchPolicy">
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch" value="100"/>
<property name="topicPrefetch" value="100"/>
</bean>
</property>
</bean>Best Practices Checklist
- [ ] Monitor consumer dispatch rate
- [ ] Configure abort slow consumer
- [ ] Optimize prefetch settings
- [ ] Scale consumers horizontally
- [ ] Monitor queue depth
- [ ] Test consumer failover
Related Issues
- [Fix ActiveMQ Destination Full](/articles/fix-activemq-destination-full)
- [Fix ActiveMQ Master Slave Failover](/articles/fix-activemq-master-slave-failover)
- [Fix RabbitMQ Consumer Timeout](/articles/fix-rabbitmq-consumer-timeout)
Related Articles
- [Fix Fix Activemq Broker Down Issue in Messaging](fix-activemq-broker-down)
- [Fix ActiveMQ Destination Full](fix-activemq-destination-full)
- [Fix ActiveMQ JDBC Lock Expired](fix-activemq-jdbc-lock-expired)
- [Fix ActiveMQ Kaha PIndex Corrupted](fix-activemq-kaha-pindex-corrupted)
- [Fix ActiveMQ Master Slave Failover](fix-activemq-master-slave-failover)
<script type="application/ld+json"> { "@context": "https://schema.org", "@type": "TechArticle", "headline": "Fix ActiveMQ Slow Consumer", "description": "Troubleshoot ActiveMQ slow consumer issues. Configure prefetch limits, optimize processing, and adjust consumer policies.", "url": "https://www.fixwikihub.com/fix-activemq-slow-consumer", "publisher": { "@type": "Organization", "name": "FixWikiHub", "url": "https://www.fixwikihub.com" }, "author": { "@type": "Person", "name": "FixWikiHub Editorial Team" }, "datePublished": "2026-04-03T19:28:13.067Z", "dateModified": "2026-04-03T19:28:13.067Z" } </script>