Mempool Streaming
Stream pending transactions from blockchain mempools with automatic classification for MEV opportunities.
Quick Start
Connect to the mempool WebSocket endpoint:
wss://api.axol.io/api/v1/mempool/{chain}/stream?token=YOUR_JWT_TOKEN
Supported Chains
| Chain | Endpoint |
|---|---|
| ethereum | /mempool/ethereum/stream |
| polygon | /mempool/polygon/stream |
| arbitrum | /mempool/arbitrum/stream |
| optimism | /mempool/optimism/stream |
| base | /mempool/base/stream |
Message Format
Transaction Message
{
"type": "pending_transaction",
"data": {
"hash": "0x123...",
"from": "0xabc...",
"to": "0xdef...",
"value": "1000000000000000000",
"gas_price": "25000000000",
"gas_limit": 21000,
"nonce": 42,
"input": "0x...",
"classification": {
"type": "swap",
"protocol": "uniswap_v3",
"tokens": ["WETH", "USDC"],
"amount_in": "1000000000000000000",
"amount_out_min": "3000000000"
},
"mev_potential": {
"type": "arbitrage",
"estimated_profit_usd": 150.50,
"confidence": 0.85
}
},
"timestamp": "2026-01-15T10:30:00.123Z",
"channel": "mempool_ethereum"
}
Transaction Classification
Each transaction is automatically classified by type:
| Type | Description | Example |
|---|---|---|
swap | Token swap on DEX | Uniswap, Curve trades |
liquidation | DeFi liquidation | Aave, Compound liquidations |
transfer | Simple token transfer | ETH or ERC20 transfers |
flashloan | Flash loan execution | Aave flash loans |
bridge | Cross-chain bridge | Hop, Across bridges |
nft | NFT transaction | OpenSea trades |
contract_deploy | New contract deployment | - |
unknown | Unclassified | Complex interactions |
Classification Details
For swap transactions:
{
"type": "swap",
"protocol": "uniswap_v3",
"tokens": ["WETH", "USDC"],
"amount_in": "1000000000000000000",
"amount_out_min": "3000000000",
"pool": "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
}
For liquidations:
{
"type": "liquidation",
"protocol": "aave_v3",
"collateral_asset": "0x...",
"debt_asset": "0x...",
"debt_to_cover": "1000000000",
"liquidated_user": "0x..."
}
MEV Detection
High-value transactions include MEV opportunity analysis:
{
"mev_potential": {
"type": "arbitrage",
"estimated_profit_usd": 150.50,
"confidence": 0.85,
"competing_txs": 3,
"gas_recommendation": {
"priority_fee": "5000000000",
"max_fee": "50000000000"
}
}
}
MEV Types
| Type | Description |
|---|---|
arbitrage | Cross-DEX price arbitrage |
sandwich | Sandwich attack opportunity |
liquidation | Liquidation opportunity |
backrun | Backrunning opportunity |
frontrun | Frontrunning opportunity |
Filtering
Subscribe with Filters
Send a filter message after connecting:
{
"type": "subscribe",
"filters": {
"min_value_eth": 1.0,
"tx_types": ["swap", "liquidation"],
"protocols": ["uniswap_v3", "aave_v3"],
"to_addresses": ["0x..."],
"from_addresses": ["0x..."]
}
}
Filter Options
| Filter | Type | Description |
|---|---|---|
min_value_eth | float | Minimum transaction value |
tx_types | string[] | Transaction types to include |
protocols | string[] | Protocols to include |
to_addresses | string[] | Filter by destination |
from_addresses | string[] | Filter by sender |
include_mev_only | boolean | Only MEV opportunities |
Code Examples
Python - MEV Bot Foundation
import asyncio
import json
import websockets
from dataclasses import dataclass
from typing import Callable, Optional
@dataclass
class MempoolTx:
hash: str
from_addr: str
to_addr: str
value: int
gas_price: int
classification: dict
mev_potential: Optional[dict] = None
class MempoolStream:
def __init__(self, jwt_token: str, chain: str = "ethereum"):
self.jwt_token = jwt_token
self.chain = chain
self.ws_url = f"wss://api.axol.io/api/v1/mempool/{chain}/stream"
self.handlers: dict[str, list[Callable]] = {}
self._ws = None
async def connect(self):
"""Connect to mempool stream."""
url = f"{self.ws_url}?token={self.jwt_token}"
self._ws = await websockets.connect(url)
print(f"Connected to {self.chain} mempool")
async def subscribe(self, filters: dict = None):
"""Subscribe with optional filters."""
if filters:
await self._ws.send(json.dumps({
"type": "subscribe",
"filters": filters
}))
def on_tx(self, tx_type: str = "all"):
"""Decorator to register transaction handlers."""
def decorator(func: Callable):
if tx_type not in self.handlers:
self.handlers[tx_type] = []
self.handlers[tx_type].append(func)
return func
return decorator
async def _dispatch(self, tx: MempoolTx):
"""Dispatch transaction to registered handlers."""
tx_type = tx.classification.get("type", "unknown")
# Call specific handlers
if tx_type in self.handlers:
for handler in self.handlers[tx_type]:
await handler(tx)
# Call catch-all handlers
if "all" in self.handlers:
for handler in self.handlers["all"]:
await handler(tx)
async def stream(self):
"""Start streaming transactions."""
while True:
try:
message = await self._ws.recv()
data = json.loads(message)
if data.get("type") == "pending_transaction":
tx_data = data["data"]
tx = MempoolTx(
hash=tx_data["hash"],
from_addr=tx_data["from"],
to_addr=tx_data["to"],
value=int(tx_data["value"]),
gas_price=int(tx_data["gas_price"]),
classification=tx_data.get("classification", {}),
mev_potential=tx_data.get("mev_potential")
)
await self._dispatch(tx)
except websockets.ConnectionClosed:
print("Connection closed, reconnecting...")
await asyncio.sleep(1)
await self.connect()
async def close(self):
"""Close connection."""
if self._ws:
await self._ws.close()
# Usage
async def main():
stream = MempoolStream(
jwt_token="YOUR_JWT_TOKEN",
chain="ethereum"
)
@stream.on_tx("swap")
async def handle_swap(tx: MempoolTx):
"""Handle swap transactions."""
if tx.mev_potential:
profit = tx.mev_potential.get("estimated_profit_usd", 0)
if profit > 100:
print(f"High-value swap detected: {tx.hash}")
print(f" Estimated profit: ${profit:.2f}")
# Execute your MEV strategy here
@stream.on_tx("liquidation")
async def handle_liquidation(tx: MempoolTx):
"""Handle liquidation transactions."""
print(f"Liquidation detected: {tx.hash}")
protocol = tx.classification.get("protocol", "unknown")
print(f" Protocol: {protocol}")
# Check if you can beat this liquidation
await stream.connect()
# Filter for high-value swaps and liquidations
await stream.subscribe({
"min_value_eth": 0.5,
"tx_types": ["swap", "liquidation"],
"include_mev_only": True
})
await stream.stream()
asyncio.run(main())
TypeScript - Real-time Monitor
interface MempoolTx {
hash: string;
from: string;
to: string;
value: string;
gas_price: string;
gas_limit: number;
classification: {
type: string;
protocol?: string;
tokens?: string[];
};
mev_potential?: {
type: string;
estimated_profit_usd: number;
confidence: number;
};
}
interface StreamFilters {
min_value_eth?: number;
tx_types?: string[];
protocols?: string[];
to_addresses?: string[];
include_mev_only?: boolean;
}
type TxHandler = (tx: MempoolTx) => void | Promise<void>;
class MempoolMonitor {
private ws: WebSocket | null = null;
private handlers: Map<string, TxHandler[]> = new Map();
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
constructor(
private jwtToken: string,
private chain: string = 'ethereum'
) {}
async connect(): Promise<void> {
const url = `wss://api.axol.io/api/v1/mempool/${this.chain}/stream?token=${this.jwtToken}`;
return new Promise((resolve, reject) => {
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log(`Connected to ${this.chain} mempool`);
this.reconnectAttempts = 0;
resolve();
};
this.ws.onclose = () => {
console.log('Connection closed');
this.handleReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
reject(error);
};
this.ws.onmessage = (event) => {
this.handleMessage(JSON.parse(event.data));
};
});
}
private async handleReconnect(): Promise<void> {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.pow(2, this.reconnectAttempts) * 1000;
console.log(`Reconnecting in ${delay}ms...`);
await new Promise(r => setTimeout(r, delay));
await this.connect();
}
}
subscribe(filters?: StreamFilters): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'subscribe',
filters
}));
}
}
onTransaction(type: string, handler: TxHandler): void {
if (!this.handlers.has(type)) {
this.handlers.set(type, []);
}
this.handlers.get(type)!.push(handler);
}
private handleMessage(message: { type: string; data: MempoolTx }): void {
if (message.type !== 'pending_transaction') return;
const tx = message.data;
const txType = tx.classification?.type || 'unknown';
// Call specific handlers
const handlers = this.handlers.get(txType) || [];
handlers.forEach(handler => handler(tx));
// Call catch-all handlers
const allHandlers = this.handlers.get('all') || [];
allHandlers.forEach(handler => handler(tx));
}
close(): void {
this.ws?.close();
}
}
// Usage
async function main() {
const monitor = new MempoolMonitor(
process.env.JWT_TOKEN!,
'ethereum'
);
// Track statistics
const stats = {
total: 0,
swaps: 0,
liquidations: 0,
mevOpportunities: 0,
totalProfitUsd: 0
};
// Handle swaps
monitor.onTransaction('swap', (tx) => {
stats.swaps++;
if (tx.mev_potential) {
stats.mevOpportunities++;
stats.totalProfitUsd += tx.mev_potential.estimated_profit_usd;
console.log(`\n[SWAP] ${tx.hash.slice(0, 10)}...`);
console.log(` Protocol: ${tx.classification.protocol}`);
console.log(` Tokens: ${tx.classification.tokens?.join(' -> ')}`);
console.log(` MEV Type: ${tx.mev_potential.type}`);
console.log(` Est. Profit: $${tx.mev_potential.estimated_profit_usd.toFixed(2)}`);
}
});
// Handle liquidations
monitor.onTransaction('liquidation', (tx) => {
stats.liquidations++;
console.log(`\n[LIQUIDATION] ${tx.hash.slice(0, 10)}...`);
console.log(` Protocol: ${tx.classification.protocol}`);
});
// Track all transactions
monitor.onTransaction('all', () => {
stats.total++;
if (stats.total % 100 === 0) {
console.log(`\n--- Stats: ${stats.total} txs, ${stats.swaps} swaps, ${stats.liquidations} liquidations ---`);
}
});
await monitor.connect();
// Subscribe with filters
monitor.subscribe({
tx_types: ['swap', 'liquidation'],
include_mev_only: true
});
console.log('Monitoring mempool...');
// Run for 10 minutes then print final stats
setTimeout(() => {
console.log('\n=== Final Statistics ===');
console.log(`Total transactions: ${stats.total}`);
console.log(`Swaps: ${stats.swaps}`);
console.log(`Liquidations: ${stats.liquidations}`);
console.log(`MEV Opportunities: ${stats.mevOpportunities}`);
console.log(`Total Est. Profit: $${stats.totalProfitUsd.toFixed(2)}`);
monitor.close();
}, 10 * 60 * 1000);
}
main().catch(console.error);
Best Practices
1. Use Filters to Reduce Volume
Unfiltered mempool streams can be overwhelming:
# Bad: Receive everything
await stream.subscribe({})
# Good: Filter for relevant transactions
await stream.subscribe({
"min_value_eth": 0.1,
"tx_types": ["swap", "liquidation"],
"protocols": ["uniswap_v3"]
})
2. Handle Reconnection
Always implement reconnection logic:
async def stream_with_reconnect():
while True:
try:
await stream.connect()
await stream.stream()
except Exception as e:
print(f"Error: {e}, reconnecting...")
await asyncio.sleep(5)
3. Process Asynchronously
Don't block the message loop:
@stream.on_tx("swap")
async def handle_swap(tx):
# Spawn task for heavy processing
asyncio.create_task(process_swap(tx))
4. Monitor Your CU Usage
Mempool streaming consumes CUs quickly:
| Filter Level | Approx. Events/sec | CUs/hour |
|---|---|---|
| Unfiltered | 100-500 | 14.4M-72M |
| Swaps only | 10-50 | 1.4M-7.2M |
| Large swaps (>1 ETH) | 1-5 | 144K-720K |
| MEV opportunities | 0.1-1 | 14K-144K |
See Also
- Mempool API (REST) - REST endpoints
- MEV Overview - MEV trading features
- WebSockets - WebSocket basics
- Rate Limits - CU consumption