Introduction

Celery workers connect to a message broker (Redis, RabbitMQ) to receive tasks. When the broker connection is lost, workers can't receive tasks, causing task processing failures and application degradation.

Symptoms

Connection lost error:

python
kombu.exceptions.OperationalError: Socket closed.
# or
amqp.exceptions.ConnectionForced: Connection was closed remotely.
# or
redis.exceptions.ConnectionError: Error while reading from socket: (104, 'Connection reset by peer')

Worker logs:

```bash $ celery -A tasks worker -l info

[2024-04-15 10:00:00] ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379/0: Error while reading from socket: (104, 'Connection reset by peer'). Trying again in 2.00 seconds... ```

Heartbeat timeout:

bash
[ERROR/MainProcess] Heartbeat missed from worker@node1
[WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...

Common Causes

  1. 1.No heartbeat configured - Broker closes idle connections
  2. 2.Network instability - Intermittent network issues
  3. 3.Broker overloaded - Redis/RabbitMQ under heavy load
  4. 4.Connection timeout - Long idle periods without activity
  5. 5.Firewall rules - Firewall terminating idle connections
  6. 6.Broker restart - Broker service restarted
  7. 7.Resource exhaustion - Broker hitting memory/connection limits

Step-by-Step Fix

Step 1: Check Broker Status

```bash # Check Redis status redis-cli ping redis-cli info stats | grep total_connections redis-cli info clients

# Check RabbitMQ status rabbitmqctl status rabbitmqctl list_connections rabbitmqctl list_queues

# Check broker logs tail -f /var/log/redis/redis-server.log tail -f /var/log/rabbitmq/rabbit@hostname.log ```

Step 2: Configure Heartbeat Settings

```python # celeryconfig.py

# Redis heartbeat configuration broker_url = 'redis://localhost:6379/0' broker_heartbeat = 60 # Send heartbeat every 60 seconds broker_connection_timeout = 4 broker_connection_retry = True broker_connection_max_retries = 10

# RabbitMQ heartbeat configuration broker_url = 'amqp://guest@localhost//' broker_heartbeat = 60 # Heartbeat interval in seconds broker_connection_timeout = 4

# Worker heartbeat configuration worker_heartbeat_interval = 60 # Worker heartbeat worker_heartbeat_expire = 120 # Expire after 2 missed heartbeats ```

Step 3: Configure Connection Pool

```python # celeryconfig.py

# Connection pool settings broker_pool_limit = 10 # Number of connections in pool broker_connection_timeout = 4 broker_connection_retry = True broker_connection_retry_on_timeout = True

# Redis connection pool CELERY_REDIS_SOCKET_TIMEOUT = 5 CELERY_REDIS_SOCKET_CONNECT_TIMEOUT = 5

# Use connection pool from kombu import Connection from celery import Celery

app = Celery('tasks') app.conf.broker_pool_limit = 10 ```

Step 4: Check Redis Configuration

```bash # Check Redis config redis-cli config get timeout redis-cli config get tcp-keepalive redis-cli config get maxclients redis-cli config get maxmemory

# Fix Redis timeout (don't close idle connections) redis-cli config set timeout 0

# Enable TCP keepalive redis-cli config set tcp-keepalive 60

# For persistent config, edit redis.conf: timeout 0 tcp-keepalive 60 maxclients 10000 ```

Step 5: Check RabbitMQ Configuration

```bash # Check RabbitMQ heartbeat rabbitmqctl environment | grep heartbeat

# Edit rabbitmq.conf # /etc/rabbitmq/rabbitmq.conf heartbeat = 60

# Check connection limits rabbitmqctl status | grep file_descriptors

# Increase file descriptor limit # In /etc/default/rabbitmq-server: ulimit -n 65536

# Restart RabbitMQ systemctl restart rabbitmq-server ```

Step 6: Implement Reconnection Logic

```python # tasks.py from celery import Celery from celery.exceptions import OperationalError import time

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, autoretry_for=(OperationalError,), retry_kwargs={'max_retries': 5, 'countdown': 2}) def send_email(self, to, subject, body): try: # Task logic here pass except OperationalError as exc: raise self.retry(exc=exc)

# Configure retry on broker failure app.conf.task_acks_late = True app.conf.task_reject_on_worker_lost = True app.conf.task_acks_on_failure_or_timeout = True ```

Step 7: Monitor Connection Health

```python # Add health check task from celery import shared_task import time

@shared_task def health_check(): """Periodic task to keep connection alive""" return {"status": "healthy", "timestamp": time.time()}

# Configure beat schedule app.conf.beat_schedule = { 'health-check': { 'task': 'tasks.health_check', 'schedule': 30.0, # Every 30 seconds }, }

# Or use celery events app.conf.worker_send_task_events = True app.conf.task_send_sent_event = True ```

Step 8: Handle Network Issues

```python # celeryconfig.py

# Connection retry settings broker_connection_retry = True broker_connection_max_retries = 100 # Try 100 times broker_connection_retry_delay = 5 # Wait 5 seconds between retries

# Use connection recovery from kombu import Connection from kombu.exceptions import OperationalError

def get_connection(): connection = Connection('redis://localhost:6379/0') connection.ensure_connection(max_retries=5, interval_start=2) return connection

# In Celery config app.conf.broker_connection_retry_on_timeout = True ```

Step 9: Scale Broker Resources

```bash # Redis: Check memory usage redis-cli info memory

# Increase Redis memory if needed redis-cli config set maxmemory 2gb redis-cli config set maxmemory-policy allkeys-lru

# RabbitMQ: Check queue depth rabbitmqctl list_queues name messages consumers

# Scale RabbitMQ cluster rabbitmqctl join_cluster rabbit@node2

# Monitor connections redis-cli client list | wc -l rabbitmqctl list_connections | wc -l ```

Step 10: Set Up Monitoring and Alerts

```python # Add Flower for monitoring pip install flower

# Start Flower celery -A tasks flower --port=5555

# Configure Prometheus metrics pip install celery-prometheus-exporter

# Add custom health check endpoint from celery import Celery from flask import Flask

app = Celery('tasks') flask_app = Flask(__name__)

@flask_app.route('/health') def health(): try: app.connection().ensure_connection(max_retries=3) return {"status": "healthy"}, 200 except Exception as e: return {"status": "unhealthy", "error": str(e)}, 503 ```

Broker Connection Settings

SettingRedis DefaultRecommended
broker_heartbeatNone60
broker_pool_limit1010-50
socket_timeoutNone5
socket_connect_timeoutNone5

Verification

```bash # After applying configuration changes # Restart Celery workers celery -A tasks worker -l info

# Watch for connection errors tail -f celery.log | grep -i "connection|error"

# Test task execution python -c "from tasks import add; add.delay(2, 3)"

# Check worker status celery -A tasks inspect active celery -A tasks inspect stats

# Monitor connection redis-cli monitor | grep -i ping

# Should show heartbeat pings every 60 seconds ```

Prevention

To prevent Celery broker connection lost issues from recurring, implement these proactive measures:

1. Monitor Broker Connection

yaml
groups:
- name: celery-broker
  rules:
  - alert: CeleryBrokerConnectionLost
    expr: |
      celery_broker_connection_status == 0
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "Celery broker connection lost"

2. Configure Connection Retry

```python # celeryconfig.py BROKER_CONNECTION_RETRY = True BROKER_CONNECTION_MAX_RETRIES = None # Retry forever BROKER_CONNECTION_RETRY_DELAY = 5 # Seconds between retries BROKER_HEARTBEAT = 60 # Keep connection alive BROKER_POOL_LIMIT = 10 # Connection pool size

# Redis specific CELERY_REDIS_SOCKET_TIMEOUT = 5 CELERY_REDIS_SOCKET_CONNECT_TIMEOUT = 5 ```

3. Implement Health Checks

```python # tasks.py from celery import shared_task import redis

@shared_task def health_check(): """Health check task to verify broker connectivity""" try: r = redis.Redis(host='localhost', port=6379) r.ping() return "broker_healthy" except Exception as e: return f"broker_unhealthy: {e}"

# Schedule health check CELERYBEAT_SCHEDULE = { 'health-check': { 'task': 'tasks.health_check', 'schedule': 60.0, # Every minute }, } ```

Best Practices Checklist

  • [ ] Monitor broker connection status
  • [ ] Configure connection retry
  • [ ] Implement health checks
  • [ ] Use connection pooling
  • [ ] Set appropriate timeouts
  • [ ] Monitor broker health
  • [Fix Celery Worker Not Consuming](/articles/fix-celery-worker-not-consuming)
  • [Fix Celery Task Retry Loop](/articles/fix-celery-task-retry-loop)
  • [Fix Redis Connection Refused](/articles/fix-redis-errors)
  • [WordPress troubleshooting: Fix Django TypeError - Complete Troubles](fix-django-typeerror)
  • [WordPress troubleshooting: Fix async task exception not awaited Iss](async-task-exception-not-awaited)
  • [WordPress troubleshooting: Fix FastAPI AttributeError - Complete Tr](fix-fastapi-attributeerror)
  • [WordPress troubleshooting: Fix Flask AttributeError - Complete Trou](fix-flask-attributeerror)
  • [WordPress troubleshooting: Fix asyncio event loop closed rerun Issu](asyncio-event-loop-closed-rerun)

<script type="application/ld+json"> { "@context": "https://schema.org", "@type": "TechArticle", "headline": "Fix Celery Broker Connection Lost", "description": "Troubleshoot Celery broker connection lost errors. Configure heartbeats, check Redis/RabbitMQ health, and optimize connection settings.", "url": "https://www.fixwikihub.com/fix-celery-broker-connection-lost", "publisher": { "@type": "Organization", "name": "FixWikiHub", "url": "https://www.fixwikihub.com" }, "author": { "@type": "Person", "name": "FixWikiHub Editorial Team" }, "datePublished": "2026-04-03T16:00:58.369Z", "dateModified": "2026-04-03T16:00:58.369Z" } </script>