Introduction
Celery workers connect to a message broker (Redis, RabbitMQ) to receive tasks. When the broker connection is lost, workers can't receive tasks, causing task processing failures and application degradation.
Symptoms
Connection lost error:
kombu.exceptions.OperationalError: Socket closed.
# or
amqp.exceptions.ConnectionForced: Connection was closed remotely.
# or
redis.exceptions.ConnectionError: Error while reading from socket: (104, 'Connection reset by peer')Worker logs:
```bash $ celery -A tasks worker -l info
[2024-04-15 10:00:00] ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379/0: Error while reading from socket: (104, 'Connection reset by peer'). Trying again in 2.00 seconds... ```
Heartbeat timeout:
[ERROR/MainProcess] Heartbeat missed from worker@node1
[WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...Common Causes
- 1.No heartbeat configured - Broker closes idle connections
- 2.Network instability - Intermittent network issues
- 3.Broker overloaded - Redis/RabbitMQ under heavy load
- 4.Connection timeout - Long idle periods without activity
- 5.Firewall rules - Firewall terminating idle connections
- 6.Broker restart - Broker service restarted
- 7.Resource exhaustion - Broker hitting memory/connection limits
Step-by-Step Fix
Step 1: Check Broker Status
```bash # Check Redis status redis-cli ping redis-cli info stats | grep total_connections redis-cli info clients
# Check RabbitMQ status rabbitmqctl status rabbitmqctl list_connections rabbitmqctl list_queues
# Check broker logs tail -f /var/log/redis/redis-server.log tail -f /var/log/rabbitmq/rabbit@hostname.log ```
Step 2: Configure Heartbeat Settings
```python # celeryconfig.py
# Redis heartbeat configuration broker_url = 'redis://localhost:6379/0' broker_heartbeat = 60 # Send heartbeat every 60 seconds broker_connection_timeout = 4 broker_connection_retry = True broker_connection_max_retries = 10
# RabbitMQ heartbeat configuration broker_url = 'amqp://guest@localhost//' broker_heartbeat = 60 # Heartbeat interval in seconds broker_connection_timeout = 4
# Worker heartbeat configuration worker_heartbeat_interval = 60 # Worker heartbeat worker_heartbeat_expire = 120 # Expire after 2 missed heartbeats ```
Step 3: Configure Connection Pool
```python # celeryconfig.py
# Connection pool settings broker_pool_limit = 10 # Number of connections in pool broker_connection_timeout = 4 broker_connection_retry = True broker_connection_retry_on_timeout = True
# Redis connection pool CELERY_REDIS_SOCKET_TIMEOUT = 5 CELERY_REDIS_SOCKET_CONNECT_TIMEOUT = 5
# Use connection pool from kombu import Connection from celery import Celery
app = Celery('tasks') app.conf.broker_pool_limit = 10 ```
Step 4: Check Redis Configuration
```bash # Check Redis config redis-cli config get timeout redis-cli config get tcp-keepalive redis-cli config get maxclients redis-cli config get maxmemory
# Fix Redis timeout (don't close idle connections) redis-cli config set timeout 0
# Enable TCP keepalive redis-cli config set tcp-keepalive 60
# For persistent config, edit redis.conf: timeout 0 tcp-keepalive 60 maxclients 10000 ```
Step 5: Check RabbitMQ Configuration
```bash # Check RabbitMQ heartbeat rabbitmqctl environment | grep heartbeat
# Edit rabbitmq.conf # /etc/rabbitmq/rabbitmq.conf heartbeat = 60
# Check connection limits rabbitmqctl status | grep file_descriptors
# Increase file descriptor limit # In /etc/default/rabbitmq-server: ulimit -n 65536
# Restart RabbitMQ systemctl restart rabbitmq-server ```
Step 6: Implement Reconnection Logic
```python # tasks.py from celery import Celery from celery.exceptions import OperationalError import time
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, autoretry_for=(OperationalError,), retry_kwargs={'max_retries': 5, 'countdown': 2}) def send_email(self, to, subject, body): try: # Task logic here pass except OperationalError as exc: raise self.retry(exc=exc)
# Configure retry on broker failure app.conf.task_acks_late = True app.conf.task_reject_on_worker_lost = True app.conf.task_acks_on_failure_or_timeout = True ```
Step 7: Monitor Connection Health
```python # Add health check task from celery import shared_task import time
@shared_task def health_check(): """Periodic task to keep connection alive""" return {"status": "healthy", "timestamp": time.time()}
# Configure beat schedule app.conf.beat_schedule = { 'health-check': { 'task': 'tasks.health_check', 'schedule': 30.0, # Every 30 seconds }, }
# Or use celery events app.conf.worker_send_task_events = True app.conf.task_send_sent_event = True ```
Step 8: Handle Network Issues
```python # celeryconfig.py
# Connection retry settings broker_connection_retry = True broker_connection_max_retries = 100 # Try 100 times broker_connection_retry_delay = 5 # Wait 5 seconds between retries
# Use connection recovery from kombu import Connection from kombu.exceptions import OperationalError
def get_connection(): connection = Connection('redis://localhost:6379/0') connection.ensure_connection(max_retries=5, interval_start=2) return connection
# In Celery config app.conf.broker_connection_retry_on_timeout = True ```
Step 9: Scale Broker Resources
```bash # Redis: Check memory usage redis-cli info memory
# Increase Redis memory if needed redis-cli config set maxmemory 2gb redis-cli config set maxmemory-policy allkeys-lru
# RabbitMQ: Check queue depth rabbitmqctl list_queues name messages consumers
# Scale RabbitMQ cluster rabbitmqctl join_cluster rabbit@node2
# Monitor connections redis-cli client list | wc -l rabbitmqctl list_connections | wc -l ```
Step 10: Set Up Monitoring and Alerts
```python # Add Flower for monitoring pip install flower
# Start Flower celery -A tasks flower --port=5555
# Configure Prometheus metrics pip install celery-prometheus-exporter
# Add custom health check endpoint from celery import Celery from flask import Flask
app = Celery('tasks') flask_app = Flask(__name__)
@flask_app.route('/health') def health(): try: app.connection().ensure_connection(max_retries=3) return {"status": "healthy"}, 200 except Exception as e: return {"status": "unhealthy", "error": str(e)}, 503 ```
Broker Connection Settings
| Setting | Redis Default | Recommended |
|---|---|---|
| broker_heartbeat | None | 60 |
| broker_pool_limit | 10 | 10-50 |
| socket_timeout | None | 5 |
| socket_connect_timeout | None | 5 |
Verification
```bash # After applying configuration changes # Restart Celery workers celery -A tasks worker -l info
# Watch for connection errors tail -f celery.log | grep -i "connection|error"
# Test task execution python -c "from tasks import add; add.delay(2, 3)"
# Check worker status celery -A tasks inspect active celery -A tasks inspect stats
# Monitor connection redis-cli monitor | grep -i ping
# Should show heartbeat pings every 60 seconds ```
Prevention
To prevent Celery broker connection lost issues from recurring, implement these proactive measures:
1. Monitor Broker Connection
groups:
- name: celery-broker
rules:
- alert: CeleryBrokerConnectionLost
expr: |
celery_broker_connection_status == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Celery broker connection lost"2. Configure Connection Retry
```python # celeryconfig.py BROKER_CONNECTION_RETRY = True BROKER_CONNECTION_MAX_RETRIES = None # Retry forever BROKER_CONNECTION_RETRY_DELAY = 5 # Seconds between retries BROKER_HEARTBEAT = 60 # Keep connection alive BROKER_POOL_LIMIT = 10 # Connection pool size
# Redis specific CELERY_REDIS_SOCKET_TIMEOUT = 5 CELERY_REDIS_SOCKET_CONNECT_TIMEOUT = 5 ```
3. Implement Health Checks
```python # tasks.py from celery import shared_task import redis
@shared_task def health_check(): """Health check task to verify broker connectivity""" try: r = redis.Redis(host='localhost', port=6379) r.ping() return "broker_healthy" except Exception as e: return f"broker_unhealthy: {e}"
# Schedule health check CELERYBEAT_SCHEDULE = { 'health-check': { 'task': 'tasks.health_check', 'schedule': 60.0, # Every minute }, } ```
Best Practices Checklist
- [ ] Monitor broker connection status
- [ ] Configure connection retry
- [ ] Implement health checks
- [ ] Use connection pooling
- [ ] Set appropriate timeouts
- [ ] Monitor broker health
Related Issues
- [Fix Celery Worker Not Consuming](/articles/fix-celery-worker-not-consuming)
- [Fix Celery Task Retry Loop](/articles/fix-celery-task-retry-loop)
- [Fix Redis Connection Refused](/articles/fix-redis-errors)
Related Articles
- [WordPress troubleshooting: Fix Django TypeError - Complete Troubles](fix-django-typeerror)
- [WordPress troubleshooting: Fix async task exception not awaited Iss](async-task-exception-not-awaited)
- [WordPress troubleshooting: Fix FastAPI AttributeError - Complete Tr](fix-fastapi-attributeerror)
- [WordPress troubleshooting: Fix Flask AttributeError - Complete Trou](fix-flask-attributeerror)
- [WordPress troubleshooting: Fix asyncio event loop closed rerun Issu](asyncio-event-loop-closed-rerun)
<script type="application/ld+json"> { "@context": "https://schema.org", "@type": "TechArticle", "headline": "Fix Celery Broker Connection Lost", "description": "Troubleshoot Celery broker connection lost errors. Configure heartbeats, check Redis/RabbitMQ health, and optimize connection settings.", "url": "https://www.fixwikihub.com/fix-celery-broker-connection-lost", "publisher": { "@type": "Organization", "name": "FixWikiHub", "url": "https://www.fixwikihub.com" }, "author": { "@type": "Person", "name": "FixWikiHub Editorial Team" }, "datePublished": "2026-04-03T16:00:58.369Z", "dateModified": "2026-04-03T16:00:58.369Z" } </script>