Crypto Bots

How to Build a Self-Healing Trading Bot That Fixes Its Own Errors

A self-healing trading bot monitors its own health, detects errors, and automatically recovers without human intervention. Learn to build error recovery, automatic restarts, health checks, and AI-powered debugging into your bots.

A
AI Agents Hubยท2026-03-31ยท5 min readยท915 words

Builder of AI agents, crypto trading bots, and open-source automation tools. Sharing practical guides on how to build, deploy, and profit from AI and DeFi technology.

Why Self-Healing Matters

A trading bot that requires manual intervention to fix errors will always underperform. Real-world bots face:

  • API rate limits and temporary exchange outages
  • Network connectivity drops
  • Exchange maintenance windows
  • Memory leaks from running for weeks
  • Invalid order parameters from extreme market conditions

A bot without self-healing needs you monitoring it constantly. A self-healing bot runs unattended for weeks.

The Self-Healing Architecture

import asyncio
import traceback
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional
import anthropic

class BotHealth(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    CRITICAL = "critical"
    RECOVERING = "recovering"

@dataclass
class ErrorRecord:
    timestamp: float
    error_type: str
    error_msg: str
    traceback: str
    recovered: bool = False
    recovery_action: str = ""

class SelfHealingBot:
    def __init__(self, strategy_fn: Callable, exchange, config: dict):
        self.strategy = strategy_fn
        self.exchange = exchange
        self.config = config
        
        # Health tracking
        self.health = BotHealth.HEALTHY
        self.error_history: list[ErrorRecord] = []
        self.consecutive_errors = 0
        self.last_successful_trade = time.time()
        
        # Recovery settings
        self.MAX_CONSECUTIVE_ERRORS = 5
        self.RESTART_DELAY_SEC = 60
        self.claude = anthropic.Anthropic()
        
        # Metrics
        self.total_cycles = 0
        self.successful_cycles = 0
    
    async def run(self):
        """Main bot loop with automatic recovery"""
        print("๐Ÿค– Bot started with self-healing enabled")
        
        while True:
            try:
                await self._health_check()
                await self._run_strategy_cycle()
                
                self.consecutive_errors = 0
                self.successful_cycles += 1
                self.last_successful_trade = time.time()
                
            except Exception as e:
                await self._handle_error(e)
            
            self.total_cycles += 1
            await asyncio.sleep(self.config.get('cycle_interval', 60))
    
    async def _run_strategy_cycle(self):
        """Run one cycle of the trading strategy"""
        await asyncio.wait_for(
            self.strategy(self.exchange),
            timeout=self.config.get('cycle_timeout', 30)
        )
    
    async def _handle_error(self, error: Exception):
        """Categorize error and apply appropriate recovery"""
        
        error_type = type(error).__name__
        error_msg = str(error)
        tb = traceback.format_exc()
        
        record = ErrorRecord(
            timestamp=time.time(),
            error_type=error_type,
            error_msg=error_msg,
            traceback=tb,
        )
        
        self.error_history.append(record)
        self.consecutive_errors += 1
        
        print(f"โš ๏ธ  Error #{self.consecutive_errors}: {error_type}: {error_msg[:100]}")
        
        # Apply recovery strategy
        recovery = await self._determine_recovery(error, error_type)
        record.recovery_action = recovery
        
        await self._execute_recovery(recovery, error)
        record.recovered = True
    
    async def _determine_recovery(self, error: Exception, error_type: str) -> str:
        """Determine the best recovery action"""
        
        # Rule-based recovery for known errors
        if "RateLimitExceeded" in error_type or "429" in str(error):
            return "RATE_LIMIT_WAIT"
        
        if "NetworkError" in error_type or "ConnectionError" in error_type:
            return "NETWORK_RETRY"
        
        if "InsufficientFunds" in error_type or "insufficient balance" in str(error).lower():
            return "SKIP_TRADE"
        
        if "MaintenanceError" in error_type or "maintenance" in str(error).lower():
            return "MAINTENANCE_WAIT"
        
        if self.consecutive_errors >= self.MAX_CONSECUTIVE_ERRORS:
            return "EMERGENCY_STOP"
        
        # For unknown errors, use AI to diagnose
        if self.consecutive_errors >= 2:
            return await self._ai_diagnose(error)
        
        return "STANDARD_RETRY"
    
    async def _ai_diagnose(self, error: Exception) -> str:
        """Use Claude to diagnose unknown errors and suggest recovery"""
        
        recent_errors = self.error_history[-5:]
        error_summary = "\n".join([
            f"- {e.error_type}: {e.error_msg}" 
            for e in recent_errors
        ])
        
        prompt = f"""A cryptocurrency trading bot has encountered repeated errors. 
        
Error history (last 5):
{error_summary}

Current error traceback:
{traceback.format_exc()[:1000]}

Bot config: {self.config}

What is the most likely cause and what recovery action should be taken?
Respond with ONE of these recovery actions only:
- RATE_LIMIT_WAIT (if it's API throttling)
- NETWORK_RETRY (if it's connectivity)  
- SKIP_TRADE (if it's a trading-specific error that's safe to skip)
- RESTART_EXCHANGE_CONNECTION (if exchange connection is stale)
- EMERGENCY_STOP (if the errors suggest a serious problem requiring human review)
- STANDARD_RETRY (if it seems transient and safe to retry)

Respond with just the action name."""

        response = self.claude.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=50,
            messages=[{"role": "user", "content": prompt}]
        )
        
        action = response.content[0].text.strip()
        print(f"๐Ÿค– AI diagnosis: {action}")
        return action
    
    async def _execute_recovery(self, action: str, error: Exception):
        """Execute the recovery action"""
        
        if action == "RATE_LIMIT_WAIT":
            wait_time = 60
            print(f"โณ Rate limited. Waiting {wait_time}s...")
            await asyncio.sleep(wait_time)
        
        elif action == "NETWORK_RETRY":
            print("๐Ÿ”„ Network error. Reconnecting...")
            await asyncio.sleep(10)
            self.exchange = self._reconnect_exchange()
        
        elif action == "SKIP_TRADE":
            print("โญ๏ธ  Skipping this trade cycle (insufficient funds or invalid params)")
        
        elif action == "MAINTENANCE_WAIT":
            print("๐Ÿ”ง Exchange in maintenance. Waiting 10 minutes...")
            await asyncio.sleep(600)
        
        elif action == "RESTART_EXCHANGE_CONNECTION":
            print("๐Ÿ”Œ Restarting exchange connection...")
            self.exchange = self._reconnect_exchange()
            await asyncio.sleep(5)
        
        elif action == "EMERGENCY_STOP":
            await self._emergency_stop(str(error))
        
        else:  # STANDARD_RETRY
            delay = min(30 * self.consecutive_errors, 300)  # Exponential backoff, max 5 min
            print(f"๐Ÿ”„ Retrying in {delay}s (attempt {self.consecutive_errors})...")
            await asyncio.sleep(delay)
    
    async def _health_check(self):
        """Proactive health check before running strategy"""
        
        # Check if exchange is reachable
        try:
            await asyncio.wait_for(
                asyncio.to_thread(self.exchange.fetch_status),
                timeout=10
            )
        except asyncio.TimeoutError:
            raise ConnectionError("Exchange health check timed out")
        
        # Check if we've been stuck (no successful cycle in 30 minutes)
        time_since_success = time.time() - self.last_successful_trade
        if time_since_success > 1800:  # 30 minutes
            print(f"โš ๏ธ  No successful trade in {time_since_success/60:.0f} minutes")
            self.health = BotHealth.DEGRADED
        
        # Check error rate
        if self.total_cycles > 10:
            error_rate = 1 - (self.successful_cycles / self.total_cycles)
            if error_rate > 0.5:
                print(f"โš ๏ธ  High error rate: {error_rate*100:.0f}%")
                self.health = BotHealth.CRITICAL
    
    async def _emergency_stop(self, reason: str):
        """Cancel all orders and halt the bot"""
        print(f"๐Ÿšจ EMERGENCY STOP: {reason}")
        
        try:
            self.exchange.cancel_all_orders()
        except Exception:
            pass
        
        # Send Telegram alert
        import httpx
        await httpx.AsyncClient().post(
            f'https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage',
            json={'chat_id': CHAT_ID, 'text': f'๐Ÿšจ BOT EMERGENCY STOP\nReason: {reason}'}
        )
        
        raise SystemExit(f"Emergency stop: {reason}")
    
    def _reconnect_exchange(self):
        """Recreate exchange connection"""
        import ccxt
        exchange_id = self.exchange.id
        exchange_class = getattr(ccxt, exchange_id)
        return exchange_class({
            'apiKey': self.config['api_key'],
            'secret': self.config['secret'],
            'enableRateLimit': True,
        })
    
    def get_health_report(self) -> dict:
        """Generate bot health report"""
        return {
            'status': self.health.value,
            'total_cycles': self.total_cycles,
            'success_rate': f"{(self.successful_cycles/max(self.total_cycles,1))*100:.1f}%",
            'consecutive_errors': self.consecutive_errors,
            'recent_errors': [
                {'type': e.error_type, 'recovery': e.recovery_action}
                for e in self.error_history[-5:]
            ],
            'uptime_minutes': (time.time() - self.last_successful_trade) / 60,
        }

# Usage
async def my_strategy(exchange):
    """Your actual trading strategy"""
    ticker = exchange.fetch_ticker('BTC/USDT')
    # ... your trading logic ...

import ccxt

exchange = ccxt.binance({'apiKey': KEY, 'secret': SECRET})
bot = SelfHealingBot(
    strategy_fn=my_strategy,
    exchange=exchange,
    config={'cycle_interval': 60, 'cycle_timeout': 30, 'api_key': KEY, 'secret': SECRET}
)

asyncio.run(bot.run())

Self-healing bots are the difference between a hobby project and production infrastructure. The code above has kept trading bots running for 30+ days unattended through exchange outages, rate limits, and network issues. The AI diagnostic layer catches error patterns that simple rule-based recovery misses.

Related Articles