Introduction

ActiveMQ uses prefetch to batch messages to consumers for efficiency. When a consumer can't process its prefetch buffer fast enough, ActiveMQ marks it as a "slow consumer" and may reduce prefetch or warn about backlog buildup, affecting overall queue performance.

Symptoms

Slow consumer warning:

```bash $ tail -f /var/log/activemq/activemq.log

[WARN] Queue://orders.queue - Consumer Consumer:1 has been slow for 30000ms [WARN] Slow consumer detected: prefetch not being consumed ```

Queue backlog:

bash
[WARN] Queue depth exceeded 10000 messages due to slow consumer
[INFO] Prefetch for consumer Consumer:1 reduced from 1000 to 100

Consumer blocked:

bash
[ERROR] Consumer blocked waiting for acknowledge
[WARN] Message backlog growing, producers may be blocked

Common Causes

  1. 1.Prefetch too high - Consumer gets more messages than it can handle
  2. 2.Message processing slow - Complex business logic or database calls
  3. 3.Consumer resource limits - CPU, memory, or thread pool exhausted
  4. 4.Network latency - Slow network between broker and consumer
  5. 5.Blocking operations - Consumer does blocking I/O in message handler
  6. 6.Single-threaded consumer - Can't process concurrent messages

Step-by-Step Fix

Step 1: Identify Slow Consumers

```bash # List consumers for a queue curl -u admin:admin http://localhost:8161/api/jolokia/list/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue,consumer=*

# Check consumer statistics curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/Consumers

# Via activemq-admin activemq-admin query --objname type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue --attribute ConsumerCount,QueueSize

# Check prefetch status curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue,consumer=Consumer:1/PrefetchSize ```

Step 2: Reduce Prefetch Size

```java // Set prefetch in connection factory ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.getPrefetchPolicy().setQueuePrefetch(10); // Reduce from default 1000

// Or in connection URI ConnectionFactory factory = new ActiveMQConnectionFactory( "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=10" );

// For specific consumer Queue queue = session.createQueue("orders.queue?consumer.prefetchSize=10"); MessageConsumer consumer = session.createConsumer(queue);

// Recommended prefetch values based on message processing time: // < 10ms per message: prefetch 100-1000 // 10-100ms per message: prefetch 10-50 // > 100ms per message: prefetch 1-10 ```

Step 3: Configure Consumer Priority

```xml <!-- In activemq.xml, configure strict priority dispatch --> <policyEntry queue="orders.queue" prioritizedMessages="true" useConsumerPriority="true" strictOrderDispatch="true"> <dispatchPolicy> <strictOrderDispatchPolicy/> </dispatchPolicy> </policyEntry>

<!-- Consumers with higher priority get messages first --> <!-- Allows fast consumers to process while slow ones catch up --> ```

```java // Set consumer priority Queue queue = session.createQueue("orders.queue?consumer.priority=10"); MessageConsumer consumer = session.createConsumer(queue);

// Priority ranges from 0-127 // Higher priority = more messages dispatched ```

Step 4: Optimize Message Processing

```java // Before: Slow single-threaded processing MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(message -> { processMessage(message); // Blocking, slow operation session.commit(); });

// After: Parallel processing with thread pool ExecutorService executor = Executors.newFixedThreadPool(10); consumer.setMessageListener(message -> { executor.submit(() -> { processMessage(message); session.commit(); }); });

// Or use Spring's DefaultMessageListenerContainer @Bean public DefaultMessageListenerContainer jmsContainer() { DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setDestinationName("orders.queue"); container.setConcurrentConsumers(10); // Multiple concurrent consumers container.setMaxConcurrentConsumers(20); container.setSessionTransacted(true); return container; } ```

Step 5: Check Consumer Resource Usage

```bash # Check consumer process CPU top -p <consumer_pid> ps -p <consumer_pid> -o %cpu,%mem,vsz,rss

# Check thread count ps -eLf | grep <consumer_pid> | wc -l

# Check open connections lsof -p <consumer_pid> | grep TCP

# Check JVM memory (for Java consumers) jstat -gc <consumer_pid> jcmd <consumer_pid> GC.heap_info

# If resources exhausted, scale consumers or optimize processing ```

Step 6: Use Async Message Processing

```java // Use async acknowledgment Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Or client acknowledgment for explicit control Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); Message message = consumer.receive(); // Process message asynchronously executor.submit(() -> { processMessage(message); message.acknowledge(); // Acknowledge after processing }); ```

Step 7: Configure Abort Slow Consumers

```xml <!-- In activemq.xml, configure slow consumer handling --> <policyEntry queue=">" abortSlowConsumerThreshold="30000" abortSlowConsumerStrategy="abort"> <slowConsumerStrategy> <abortSlowConsumerStrategy abortConnection="true"/> </slowConsumerStrategy> </policyEntry>

<!-- abortSlowConsumerThreshold: Time in ms before aborting --> <!-- abortConnection: Also close consumer connection -->

<!-- Alternative: Reduce prefetch for slow consumers --> <policyEntry queue=">" reduceSlowConsumerPrefetch="true" slowConsumerPrefetchReductionFactor="0.5"> <slowConsumerStrategy> <prefetchSlowConsumerStrategy/> </slowConsumerStrategy> </policyEntry> ```

Step 8: Monitor Consumer Lag

```bash # Check dispatch queue size curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue,consumer=Consumer:1/DispatchedQueueSize

# Check dequeued vs acknowledged ratio curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/DequeueCount curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue,consumer=Consumer:1/AcknowledgeCount

# If dequeued >> acknowledged, consumer is slow

# Monitor processing time # Add timing to consumer code: long start = System.currentTimeMillis(); processMessage(message); long elapsed = System.currentTimeMillis() - start; log.info("Processing time: {} ms", elapsed); ```

Step 9: Scale Consumer Count

```bash # Check current consumer count curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/ConsumerCount

# Add more consumers # Java: Use concurrent consumers DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConcurrentConsumers(5); container.setMaxConcurrentConsumers(20);

# Or deploy multiple consumer instances # Kubernetes: Scale consumer deployment kubectl scale deployment order-consumer --replicas=10

# Ensure consumers share same queue for load balancing ```

Step 10: Check Network Performance

```bash # Test network latency to broker ping activemq-host

# Check TCP connection stats netstat -i

# Measure message throughput # On broker: activemq-admin query --objname type=Broker --attribute TotalMessageCount,TotalDequeueCount

# Monitor connection bandwidth iftop -i eth0

# If high latency, increase prefetch and reduce acknowledgments # Or move consumers closer to broker ```

Consumer Performance Settings

SettingDefaultSlow Consumer Recommended
prefetchPolicy.queuePrefetch100010-100
concurrentConsumers15-20
abortSlowConsumerThreshold0 (disabled)30000ms
consumer.priority0Higher for fast consumers

Verification

```bash # After configuration changes # Restart consumers

# Check consumer dispatch rate curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/DequeueCount

# Watch for slow consumer warnings tail -f /var/log/activemq/activemq.log | grep -i "slow consumer"

# Should show no warnings

# Check queue depth curl -u admin:admin http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=orders.queue/QueueSize

# Should be stable or decreasing

# Monitor consumer statistics activemq-admin query --objname type=Broker,destinationType=Queue --attribute QueueSize,ConsumerCount,DequeueCount ```

Prevention

To prevent ActiveMQ slow consumer issues from recurring, implement these proactive measures:

1. Monitor Consumer Performance

yaml
groups:
- name: activemq-consumers
  rules:
  - alert: ActiveMQSlowConsumer
    expr: |
      activemq_queue_consumer_dispatch_rate < 10
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "ActiveMQ consumer dispatch rate below 10 msg/s"

2. Configure Abort Slow Consumer

xml
<!-- activemq.xml -->
<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" 
                   slowConsumerThreshold="30000"
                   slowConsumerCheckPeriod="60000"
                   abortSlowConsumer="true">
        <slowConsumerStrategy>
          <abortSlowConsumerStrategy abortConnection="false"/>
        </slowConsumerStrategy>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

3. Optimize Prefetch Settings

xml
<!-- ActiveMQ connection factory configuration -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
  <property name="prefetchPolicy">
    <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
      <property name="queuePrefetch" value="100"/>
      <property name="topicPrefetch" value="100"/>
    </bean>
  </property>
</bean>

Best Practices Checklist

  • [ ] Monitor consumer dispatch rate
  • [ ] Configure abort slow consumer
  • [ ] Optimize prefetch settings
  • [ ] Scale consumers horizontally
  • [ ] Monitor queue depth
  • [ ] Test consumer failover
  • [Fix ActiveMQ Destination Full](/articles/fix-activemq-destination-full)
  • [Fix ActiveMQ Master Slave Failover](/articles/fix-activemq-master-slave-failover)
  • [Fix RabbitMQ Consumer Timeout](/articles/fix-rabbitmq-consumer-timeout)
  • [Fix Fix Activemq Broker Down Issue in Messaging](fix-activemq-broker-down)
  • [Fix ActiveMQ Destination Full](fix-activemq-destination-full)
  • [Fix ActiveMQ JDBC Lock Expired](fix-activemq-jdbc-lock-expired)
  • [Fix ActiveMQ Kaha PIndex Corrupted](fix-activemq-kaha-pindex-corrupted)
  • [Fix ActiveMQ Master Slave Failover](fix-activemq-master-slave-failover)

<script type="application/ld+json"> { "@context": "https://schema.org", "@type": "TechArticle", "headline": "Fix ActiveMQ Slow Consumer", "description": "Troubleshoot ActiveMQ slow consumer issues. Configure prefetch limits, optimize processing, and adjust consumer policies.", "url": "https://www.fixwikihub.com/fix-activemq-slow-consumer", "publisher": { "@type": "Organization", "name": "FixWikiHub", "url": "https://www.fixwikihub.com" }, "author": { "@type": "Person", "name": "FixWikiHub Editorial Team" }, "datePublished": "2026-04-03T19:28:13.067Z", "dateModified": "2026-04-03T19:28:13.067Z" } </script>