Intermediate 10 min WebSocket 1011

1011 Internal Error — Server Crash During Stream

लक्षण

- Active WebSocket connection closes abruptly with code 1011 (or 1006 if the server crashes without sending a close frame)
- Real-time data stream (chat, stock prices, live notifications) stops unexpectedly
- Server logs show an unhandled exception traceback in the WebSocket handler function
- All connections to one worker process drop simultaneously (worker OOM or crash)
- The connection re-establishes successfully after reconnect, suggesting transient state corruption

मूल कारण

  • Unhandled exception raised inside the WebSocket message handler (e.g., KeyError, AttributeError)
  • Server running out of memory while processing concurrent WebSocket connections
  • Serialization error when broadcasting a message containing an unexpected non-serializable type
  • Race condition in shared state accessed concurrently by multiple WebSocket connection handlers
  • Worker process (Gunicorn worker, Node.js cluster worker) crashing, killing all its connections

निदान

**Step 1 — Find the crash in server logs**

```bash
# Check application logs for the exact timestamp of the 1011 close:
sudo journalctl -u gunicorn-myapp -n 100 --no-pager | grep -E 'ERROR|Exception|Traceback'

# Django Channels — look for ASGI consumer exceptions:
grep -r 'WebSocket.*ERROR\|consumer.*error' /var/log/myapp/ | tail -20
```

**Step 2 — Reproduce with a minimal test**

```python
# Test the handler with the message type that triggers the crash:
import asyncio, websockets

async def reproduce():
async with websockets.connect('ws://localhost:8001/ws') as ws:
await ws.send('{"type": "trigger_crash", "data": null}')
response = await ws.recv()
print(response)

asyncio.run(reproduce())
```

**Step 3 — Check for OOM (Out of Memory) kills**

```bash
# Look for OOM killer events:
sudo dmesg | grep -i 'killed process\|out of memory' | tail -20

# Check system memory usage at time of crash:
sudo journalctl --since '10 minutes ago' | grep -i 'oom\|memory'

# Monitor memory per connection:
python3 -c "import tracemalloc; tracemalloc.start()" &
```

**Step 4 — Identify race conditions**

```python
# Unsafe shared state (common pattern):
connected_users = {} # shared dict — unsafe with async concurrency

# Safe alternative — use asyncio.Lock:
import asyncio
_lock = asyncio.Lock()
_connected_users: dict[str, 'MyConsumer'] = {}

async def add_user(user_id: str, consumer: 'MyConsumer') -> None:
async with _lock:
_connected_users[user_id] = consumer
```

**Step 5 — Check serialization errors**

```python
# Test JSON serialization of the broadcast payload:
import json
try:
payload = json.dumps(your_data)
except (TypeError, ValueError) as e:
logger.error('Serialization failed', error=str(e), data_type=type(your_data).__name__)
```

समाधान

**Fix 1 — Wrap WebSocket handlers with error boundaries**

```python
# Django Channels consumer:
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class SafeConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data: str) -> None:
try:
data = json.loads(text_data)
await self.handle_message(data)
except json.JSONDecodeError:
await self.send(json.dumps({'error': 'invalid_json'}))
except Exception:
logger.exception('WebSocket handler error')
# Close gracefully — do NOT let the exception propagate
await self.close(code=1011)
```

**Fix 2 — Implement client-side auto-reconnect with exponential backoff**

```javascript
function createWebSocket(url, onMessage) {
let delay = 1000; // start 1s
const MAX_DELAY = 30_000;

function connect() {
const ws = new WebSocket(url);
ws.onmessage = onMessage;
ws.onclose = (e) => {
console.log(`Closed ${e.code}, reconnecting in ${delay}ms`);
setTimeout(connect, delay);
delay = Math.min(delay * 2, MAX_DELAY);
};
ws.onopen = () => { delay = 1000; }; // reset on success
}
connect();
}
```

**Fix 3 — Limit per-connection memory to prevent OOM**

```python
# Cap message history or buffer per connection:
MAX_QUEUE_SIZE = 100

class BoundedConsumer(AsyncWebsocketConsumer):
def __init__(self):
super().__init__()
self.message_queue: list[dict] = []

async def receive(self, text_data: str) -> None:
if len(self.message_queue) >= MAX_QUEUE_SIZE:
self.message_queue.pop(0) # evict oldest
self.message_queue.append(json.loads(text_data))
```

रोकथाम

- Always wrap WebSocket message handlers in try/except to prevent a single bad message from crashing the connection
- Implement client-side auto-reconnect with exponential backoff for all WebSocket connections
- Use a circuit breaker pattern on the server to shed load before OOM rather than crashing
- Log the full exception traceback with correlation IDs to quickly identify crash causes in production
- Test crash recovery in staging by deliberately injecting malformed messages

संबंधित स्टेटस कोड

संबंधित शब्द