Liquidation Monitoring Guide
Build a production-ready liquidation monitoring system that tracks DeFi positions across Aave, Compound, Morpho, and Spark. This guide covers real-time streaming, position analysis, and execution strategies.
Overview
DeFi liquidations occur when a borrower's collateral value drops below their debt obligations. Monitoring these positions provides profitable MEV opportunities with typical profits of $50-$5,000+ per liquidation.
Supported Protocols
| Protocol | Chains | Liquidation Bonus |
|---|---|---|
| Aave V2 | ethereum, polygon | 5-15% |
| Aave V3 | ethereum, polygon, arbitrum, optimism, base | 4-10% |
| Compound V2 | ethereum | 5-8% |
| Compound V3 | ethereum, polygon, arbitrum | 5% |
| Morpho | ethereum | Variable |
| Spark | ethereum | 4-10% |
Prerequisites
- Axol API key (Get one free)
- Python 3.8+ or Node.js 16+
- Understanding of DeFi lending mechanics
- (Optional) Flash loan provider for capital-efficient liquidations
Understanding Health Factors
The health factor determines liquidation eligibility:
| Status | Health Factor | Risk | Action |
|---|---|---|---|
HEALTHY | HF > 1.5 | Low | No action needed |
WARNING | 1.2 < HF <= 1.5 | Medium | Monitor closely |
AT_RISK | 1 < HF <= 1.2 | High | Approaching liquidation |
LIQUIDATABLE | HF <= 1 | Critical | Can be liquidated |
Formula:
Health Factor = (Collateral Value * Liquidation Threshold) / Debt Value
When health factor drops below 1.0, anyone can liquidate up to 50% of the debt and receive a bonus from the collateral.
Quick Start: Check At-Risk Positions
REST API
Python:
import requests
import os
API_KEY = os.getenv('AXOL_API_KEY')
BASE_URL = 'https://api.axol.io/api/v1'
headers = {'X-API-Key': API_KEY}
# Get positions at risk across all supported protocols
response = requests.get(
f'{BASE_URL}/defi/positions/at-risk',
headers=headers,
params={
'min_debt_usd': 1000,
'max_health_factor': 1.2,
'limit': 20
}
)
positions = response.json()
for pos in positions['positions']:
print(f"Protocol: {pos['protocol']}")
print(f" Borrower: {pos['borrower']}")
print(f" Health Factor: {pos['health_factor']:.4f}")
print(f" Debt: ${pos['total_debt_usd']:,.2f}")
print(f" Collateral: ${pos['total_collateral_usd']:,.2f}")
print(f" Status: {pos['status']}")
print()
TypeScript:
const API_KEY = process.env.AXOL_API_KEY;
const BASE_URL = 'https://api.axol.io/api/v1';
const response = await fetch(
`${BASE_URL}/defi/positions/at-risk?min_debt_usd=1000&max_health_factor=1.2&limit=20`,
{ headers: { 'X-API-Key': API_KEY } }
);
const positions = await response.json();
for (const pos of positions.positions) {
console.log(`Protocol: ${pos.protocol}`);
console.log(` Borrower: ${pos.borrower}`);
console.log(` Health Factor: ${pos.health_factor.toFixed(4)}`);
console.log(` Debt: $${pos.total_debt_usd.toLocaleString()}`);
console.log(` Status: ${pos.status}`);
}
Real-Time Streaming
For MEV extraction, real-time monitoring is essential. Use WebSocket streaming to receive instant updates.
Get JWT Token
WebSocket connections require JWT authentication:
import requests
# Exchange API key for JWT token
auth_response = requests.post(
f'{BASE_URL}/auth/token',
headers={'X-API-Key': API_KEY},
json={'grant_type': 'client_credentials'}
)
jwt_token = auth_response.json()['access_token']
Stream Position Updates
Python:
import asyncio
import websockets
import json
async def stream_liquidations(jwt_token: str):
url = f"wss://api.axol.io/api/v1/defi/liquidations/stream?token={jwt_token}"
async with websockets.connect(url) as ws:
# Subscribe with filters
await ws.send(json.dumps({
'type': 'subscribe',
'filters': {
'protocols': ['aave_v3', 'compound_v3'],
'min_debt_usd': 5000,
'max_health_factor': 1.1,
'chains': ['ethereum', 'arbitrum']
}
}))
async for message in ws:
data = json.loads(message)
if data['type'] == 'position_update':
pos = data['data']
print(f"[{pos['status']}] {pos['protocol']} - HF: {pos['health_factor']:.4f}")
print(f" Borrower: {pos['borrower']}")
print(f" Debt: ${pos['total_debt_usd']:,.2f}")
if pos['status'] == 'LIQUIDATABLE':
print(f" >>> LIQUIDATION OPPORTUNITY <<<")
print(f" Est. Profit: ${pos.get('estimated_profit_usd', 0):,.2f}")
elif data['type'] == 'liquidation_executed':
liq = data['data']
print(f"[EXECUTED] {liq['tx_hash']}")
print(f" Liquidator: {liq['liquidator']}")
print(f" Profit: ${liq['profit_usd']:,.2f}")
asyncio.run(stream_liquidations(jwt_token))
TypeScript:
import WebSocket from 'ws';
interface PositionUpdate {
type: 'position_update';
data: {
protocol: string;
borrower: string;
health_factor: number;
total_debt_usd: number;
total_collateral_usd: number;
status: 'HEALTHY' | 'WARNING' | 'AT_RISK' | 'LIQUIDATABLE';
estimated_profit_usd?: number;
};
}
async function streamLiquidations(jwtToken: string) {
const ws = new WebSocket(
`wss://api.axol.io/api/v1/defi/liquidations/stream?token=${jwtToken}`
);
ws.on('open', () => {
ws.send(JSON.stringify({
type: 'subscribe',
filters: {
protocols: ['aave_v3', 'compound_v3'],
min_debt_usd: 5000,
max_health_factor: 1.1
}
}));
});
ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'position_update') {
const pos = msg.data;
console.log(`[${pos.status}] ${pos.protocol} - HF: ${pos.health_factor.toFixed(4)}`);
if (pos.status === 'LIQUIDATABLE') {
console.log(`>>> LIQUIDATION OPPORTUNITY <<<`);
console.log(`Est. Profit: $${pos.estimated_profit_usd?.toLocaleString()}`);
}
}
});
ws.on('error', console.error);
}
Production Liquidation Bot
Here's a complete liquidation monitoring system:
Python:
import asyncio
import aiohttp
import websockets
import json
import logging
from dataclasses import dataclass, field
from typing import Optional, Callable
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('liquidation_bot')
@dataclass
class LiquidationConfig:
api_key: str
min_profit_usd: float = 50.0
min_debt_usd: float = 1000.0
max_health_factor: float = 1.05
protocols: list = field(default_factory=lambda: ['aave_v3', 'compound_v3'])
chains: list = field(default_factory=lambda: ['ethereum'])
class LiquidationMonitor:
def __init__(self, config: LiquidationConfig):
self.config = config
self.base_url = 'https://api.axol.io/api/v1'
self.jwt_token: Optional[str] = None
self.on_opportunity: Optional[Callable] = None
self._running = False
async def _get_jwt_token(self) -> str:
"""Exchange API key for JWT token."""
async with aiohttp.ClientSession() as session:
async with session.post(
f'{self.base_url}/auth/token',
headers={'X-API-Key': self.config.api_key},
json={'grant_type': 'client_credentials'}
) as resp:
data = await resp.json()
return data['access_token']
async def _refresh_token_loop(self):
"""Refresh JWT token every 50 minutes."""
while self._running:
await asyncio.sleep(3000) # 50 minutes
try:
self.jwt_token = await self._get_jwt_token()
logger.info("JWT token refreshed")
except Exception as e:
logger.error(f"Token refresh failed: {e}")
async def get_at_risk_positions(self) -> list:
"""Fetch current at-risk positions via REST API."""
async with aiohttp.ClientSession() as session:
async with session.get(
f'{self.base_url}/defi/positions/at-risk',
headers={'X-API-Key': self.config.api_key},
params={
'min_debt_usd': self.config.min_debt_usd,
'max_health_factor': self.config.max_health_factor,
'protocols': ','.join(self.config.protocols)
}
) as resp:
data = await resp.json()
return data.get('positions', [])
async def _handle_position(self, position: dict):
"""Process a position update."""
if position['status'] == 'LIQUIDATABLE':
profit = position.get('estimated_profit_usd', 0)
if profit >= self.config.min_profit_usd:
logger.info(
f"Liquidation opportunity: {position['protocol']} "
f"${profit:,.2f} profit"
)
if self.on_opportunity:
await self.on_opportunity(position)
async def _stream_positions(self):
"""Stream position updates via WebSocket."""
url = f"wss://api.axol.io/api/v1/defi/liquidations/stream?token={self.jwt_token}"
while self._running:
try:
async with websockets.connect(url) as ws:
# Subscribe with filters
await ws.send(json.dumps({
'type': 'subscribe',
'filters': {
'protocols': self.config.protocols,
'min_debt_usd': self.config.min_debt_usd,
'max_health_factor': self.config.max_health_factor,
'chains': self.config.chains
}
}))
logger.info("Connected to liquidation stream")
async for message in ws:
data = json.loads(message)
if data['type'] == 'position_update':
await self._handle_position(data['data'])
elif data['type'] == 'liquidation_executed':
liq = data['data']
logger.info(
f"Liquidation executed: {liq['tx_hash'][:16]}... "
f"Profit: ${liq['profit_usd']:,.2f}"
)
except websockets.ConnectionClosed:
logger.warning("WebSocket closed, reconnecting...")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Stream error: {e}")
await asyncio.sleep(5)
async def start(self):
"""Start the liquidation monitor."""
self._running = True
self.jwt_token = await self._get_jwt_token()
logger.info(f"Starting liquidation monitor")
logger.info(f" Protocols: {self.config.protocols}")
logger.info(f" Min profit: ${self.config.min_profit_usd}")
logger.info(f" Max health factor: {self.config.max_health_factor}")
# Run token refresh and streaming concurrently
await asyncio.gather(
self._refresh_token_loop(),
self._stream_positions()
)
def stop(self):
"""Stop the monitor."""
self._running = False
# Example usage
async def handle_opportunity(position: dict):
"""Your liquidation execution logic here."""
print(f"\n{'='*50}")
print(f"LIQUIDATION OPPORTUNITY DETECTED")
print(f"{'='*50}")
print(f"Protocol: {position['protocol']}")
print(f"Borrower: {position['borrower']}")
print(f"Health Factor: {position['health_factor']:.6f}")
print(f"Debt: ${position['total_debt_usd']:,.2f}")
print(f"Collateral: ${position['total_collateral_usd']:,.2f}")
print(f"Estimated Profit: ${position.get('estimated_profit_usd', 0):,.2f}")
print(f"{'='*50}\n")
# Execution logic intentionally omitted - see "Execution Strategies" section below
# Your implementation would call your smart contract or use a flash loan provider
async def main():
config = LiquidationConfig(
api_key=os.getenv('AXOL_API_KEY'),
min_profit_usd=100,
min_debt_usd=5000,
max_health_factor=1.05,
protocols=['aave_v3', 'compound_v3', 'morpho'],
chains=['ethereum', 'arbitrum']
)
monitor = LiquidationMonitor(config)
monitor.on_opportunity = handle_opportunity
await monitor.start()
if __name__ == '__main__':
import os
asyncio.run(main())
TypeScript Production Bot
TypeScript:
import WebSocket from 'ws';
import axios from 'axios';
interface LiquidationConfig {
apiKey: string;
minProfitUsd: number;
minDebtUsd: number;
maxHealthFactor: number;
protocols: string[];
chains: string[];
}
interface Position {
protocol: string;
chain: string;
borrower: string;
health_factor: number;
total_debt_usd: number;
total_collateral_usd: number;
status: 'HEALTHY' | 'WARNING' | 'AT_RISK' | 'LIQUIDATABLE';
estimated_profit_usd?: number;
collateral_assets: Array<{ symbol: string; amount: number; value_usd: number }>;
debt_assets: Array<{ symbol: string; amount: number; value_usd: number }>;
}
class LiquidationMonitor {
private config: LiquidationConfig;
private baseUrl = 'https://api.axol.io/api/v1';
private jwtToken: string | null = null;
private ws: WebSocket | null = null;
private running = false;
onOpportunity?: (position: Position) => Promise<void>;
constructor(config: LiquidationConfig) {
this.config = config;
}
private async getJwtToken(): Promise<string> {
const response = await axios.post(
`${this.baseUrl}/auth/token`,
{ grant_type: 'client_credentials' },
{ headers: { 'X-API-Key': this.config.apiKey } }
);
return response.data.access_token;
}
async getAtRiskPositions(): Promise<Position[]> {
const response = await axios.get(`${this.baseUrl}/defi/positions/at-risk`, {
headers: { 'X-API-Key': this.config.apiKey },
params: {
min_debt_usd: this.config.minDebtUsd,
max_health_factor: this.config.maxHealthFactor,
protocols: this.config.protocols.join(',')
}
});
return response.data.positions;
}
private async handlePosition(position: Position): Promise<void> {
if (position.status === 'LIQUIDATABLE') {
const profit = position.estimated_profit_usd || 0;
if (profit >= this.config.minProfitUsd) {
console.log(`[OPPORTUNITY] ${position.protocol} - $${profit.toLocaleString()} profit`);
if (this.onOpportunity) {
await this.onOpportunity(position);
}
}
}
}
private connect(): void {
const url = `wss://api.axol.io/api/v1/defi/liquidations/stream?token=${this.jwtToken}`;
this.ws = new WebSocket(url);
this.ws.on('open', () => {
console.log('Connected to liquidation stream');
this.ws?.send(JSON.stringify({
type: 'subscribe',
filters: {
protocols: this.config.protocols,
min_debt_usd: this.config.minDebtUsd,
max_health_factor: this.config.maxHealthFactor,
chains: this.config.chains
}
}));
});
this.ws.on('message', async (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'position_update') {
await this.handlePosition(msg.data);
} else if (msg.type === 'liquidation_executed') {
console.log(`[EXECUTED] ${msg.data.tx_hash.slice(0, 16)}... Profit: $${msg.data.profit_usd}`);
}
});
this.ws.on('close', () => {
console.log('WebSocket closed');
if (this.running) {
setTimeout(() => this.connect(), 1000);
}
});
this.ws.on('error', (error) => {
console.error('WebSocket error:', error);
});
}
async start(): Promise<void> {
this.running = true;
this.jwtToken = await this.getJwtToken();
console.log('Starting liquidation monitor');
console.log(` Protocols: ${this.config.protocols.join(', ')}`);
console.log(` Min profit: $${this.config.minProfitUsd}`);
// Refresh token every 50 minutes
setInterval(async () => {
try {
this.jwtToken = await this.getJwtToken();
console.log('JWT token refreshed');
} catch (e) {
console.error('Token refresh failed:', e);
}
}, 50 * 60 * 1000);
this.connect();
}
stop(): void {
this.running = false;
this.ws?.close();
}
}
// Usage
async function main() {
const monitor = new LiquidationMonitor({
apiKey: process.env.AXOL_API_KEY!,
minProfitUsd: 100,
minDebtUsd: 5000,
maxHealthFactor: 1.05,
protocols: ['aave_v3', 'compound_v3', 'morpho'],
chains: ['ethereum', 'arbitrum']
});
monitor.onOpportunity = async (position) => {
console.log('\n' + '='.repeat(50));
console.log('LIQUIDATION OPPORTUNITY DETECTED');
console.log('='.repeat(50));
console.log(`Protocol: ${position.protocol}`);
console.log(`Borrower: ${position.borrower}`);
console.log(`Health Factor: ${position.health_factor.toFixed(6)}`);
console.log(`Debt: $${position.total_debt_usd.toLocaleString()}`);
console.log(`Est. Profit: $${position.estimated_profit_usd?.toLocaleString()}`);
console.log('='.repeat(50) + '\n');
// Execution logic intentionally omitted - see "Execution Strategies" section below
};
await monitor.start();
}
main();
Execution Strategies
Flash Loan Liquidations
Capital-efficient liquidations using flash loans:
# Conceptual example - integrate with your preferred flash loan provider
async def execute_flash_loan_liquidation(position: dict):
"""
Execute liquidation using Aave flash loan:
1. Flash borrow debt asset
2. Repay borrower's debt
3. Receive collateral bonus
4. Swap collateral to debt asset
5. Repay flash loan + fee
6. Keep profit
"""
debt_to_repay = position['total_debt_usd'] * 0.5 # Max 50% liquidation
collateral_to_receive = debt_to_repay * (1 + position['liquidation_bonus'])
flash_loan_fee = debt_to_repay * 0.0009 # Aave flash loan fee
expected_profit = collateral_to_receive - debt_to_repay - flash_loan_fee
if expected_profit > MIN_PROFIT_THRESHOLD:
# Execute via your smart contract
pass
Direct Liquidations
For positions requiring immediate action:
async def execute_direct_liquidation(position: dict):
"""
Direct liquidation (requires holding debt asset):
1. Approve debt asset spending
2. Call liquidationCall on lending protocol
3. Receive collateral + bonus
"""
# Use web3.py or similar to interact with the lending protocol
pass
Filtering Strategies
By Collateral Type
Focus on specific collateral assets:
# Filter for ETH collateral positions (easier to handle)
await ws.send(json.dumps({
'type': 'subscribe',
'filters': {
'collateral_assets': ['WETH', 'stETH', 'wstETH'],
'max_health_factor': 1.05
}
}))
By Position Size
Target larger positions for better economics:
# Only positions with significant debt
await ws.send(json.dumps({
'type': 'subscribe',
'filters': {
'min_debt_usd': 10000,
'min_collateral_usd': 15000,
'max_health_factor': 1.02
}
}))
Multi-Chain Monitoring
Monitor multiple chains simultaneously:
# Create separate streams for each chain
chains = ['ethereum', 'arbitrum', 'optimism', 'base']
async def monitor_chain(chain: str):
url = f"wss://api.axol.io/api/v1/defi/liquidations/stream?token={jwt_token}"
# ... connection logic with chain filter
await asyncio.gather(*[monitor_chain(chain) for chain in chains])
Best Practices
1. Prioritize Speed
# Use connection pooling
connector = aiohttp.TCPConnector(limit=100, keepalive_timeout=30)
session = aiohttp.ClientSession(connector=connector)
# Cache frequently accessed data
position_cache = {}
2. Handle Edge Cases
async def safe_liquidate(position: dict) -> bool:
# Verify position is still liquidatable
current = await get_position(position['borrower'])
if current['health_factor'] >= 1.0:
logger.info("Position no longer liquidatable")
return False
# Check gas economics
gas_price = await get_gas_price()
estimated_gas_cost = gas_price['fast'] * LIQUIDATION_GAS_LIMIT
if position['estimated_profit_usd'] < estimated_gas_cost * 1.5:
logger.info("Profit margin too thin")
return False
return True
3. Monitor Competition
# Track other liquidators
async def on_liquidation_executed(event: dict):
if event['liquidator'] != MY_ADDRESS:
competitor_stats[event['liquidator']] = {
'count': competitor_stats.get(event['liquidator'], {}).get('count', 0) + 1,
'last_seen': datetime.now()
}
4. Graceful Degradation
# Fallback to REST polling if WebSocket fails
async def monitor_with_fallback():
try:
await stream_positions()
except Exception:
logger.warning("Falling back to REST polling")
while True:
positions = await get_at_risk_positions()
for pos in positions:
await handle_position(pos)
await asyncio.sleep(5)
Rate Limits
| Tier | WebSocket Events/sec | REST Calls/min |
|---|---|---|
| Free | ~12 | 60 |
| Starter | ~75 | 300 |
| Professional | ~250 | 1000 |
| Business | ~375 | 3000 |
See Rate Limits for full details.
Next Steps
-
Understand the data:
- DeFi Liquidations API - Endpoint reference
- Liquidation Streaming - WebSocket details
-
Optimize execution:
- Gas Oracle - Optimal gas pricing
- Gateway - Low-latency node access
-
Monitor performance:
- Unified Metrics - Track your usage
Support
Questions about liquidation monitoring?
- Discord: #mev-trading channel
- Email: hello@axol.io