Skip to main content

Mempool Streaming

Stream pending transactions from blockchain mempools with automatic classification for MEV opportunities.

Quick Start

Connect to the mempool WebSocket endpoint:

wss://api.axol.io/api/v1/mempool/{chain}/stream?token=YOUR_JWT_TOKEN

Supported Chains

ChainEndpoint
ethereum/mempool/ethereum/stream
polygon/mempool/polygon/stream
arbitrum/mempool/arbitrum/stream
optimism/mempool/optimism/stream
base/mempool/base/stream

Message Format

Transaction Message

{
"type": "pending_transaction",
"data": {
"hash": "0x123...",
"from": "0xabc...",
"to": "0xdef...",
"value": "1000000000000000000",
"gas_price": "25000000000",
"gas_limit": 21000,
"nonce": 42,
"input": "0x...",
"classification": {
"type": "swap",
"protocol": "uniswap_v3",
"tokens": ["WETH", "USDC"],
"amount_in": "1000000000000000000",
"amount_out_min": "3000000000"
},
"mev_potential": {
"type": "arbitrage",
"estimated_profit_usd": 150.50,
"confidence": 0.85
}
},
"timestamp": "2026-01-15T10:30:00.123Z",
"channel": "mempool_ethereum"
}

Transaction Classification

Each transaction is automatically classified by type:

TypeDescriptionExample
swapToken swap on DEXUniswap, Curve trades
liquidationDeFi liquidationAave, Compound liquidations
transferSimple token transferETH or ERC20 transfers
flashloanFlash loan executionAave flash loans
bridgeCross-chain bridgeHop, Across bridges
nftNFT transactionOpenSea trades
contract_deployNew contract deployment-
unknownUnclassifiedComplex interactions

Classification Details

For swap transactions:

{
"type": "swap",
"protocol": "uniswap_v3",
"tokens": ["WETH", "USDC"],
"amount_in": "1000000000000000000",
"amount_out_min": "3000000000",
"pool": "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
}

For liquidations:

{
"type": "liquidation",
"protocol": "aave_v3",
"collateral_asset": "0x...",
"debt_asset": "0x...",
"debt_to_cover": "1000000000",
"liquidated_user": "0x..."
}

MEV Detection

High-value transactions include MEV opportunity analysis:

{
"mev_potential": {
"type": "arbitrage",
"estimated_profit_usd": 150.50,
"confidence": 0.85,
"competing_txs": 3,
"gas_recommendation": {
"priority_fee": "5000000000",
"max_fee": "50000000000"
}
}
}

MEV Types

TypeDescription
arbitrageCross-DEX price arbitrage
sandwichSandwich attack opportunity
liquidationLiquidation opportunity
backrunBackrunning opportunity
frontrunFrontrunning opportunity

Filtering

Subscribe with Filters

Send a filter message after connecting:

{
"type": "subscribe",
"filters": {
"min_value_eth": 1.0,
"tx_types": ["swap", "liquidation"],
"protocols": ["uniswap_v3", "aave_v3"],
"to_addresses": ["0x..."],
"from_addresses": ["0x..."]
}
}

Filter Options

FilterTypeDescription
min_value_ethfloatMinimum transaction value
tx_typesstring[]Transaction types to include
protocolsstring[]Protocols to include
to_addressesstring[]Filter by destination
from_addressesstring[]Filter by sender
include_mev_onlybooleanOnly MEV opportunities

Code Examples

Python - MEV Bot Foundation

import asyncio
import json
import websockets
from dataclasses import dataclass
from typing import Callable, Optional

@dataclass
class MempoolTx:
hash: str
from_addr: str
to_addr: str
value: int
gas_price: int
classification: dict
mev_potential: Optional[dict] = None

class MempoolStream:
def __init__(self, jwt_token: str, chain: str = "ethereum"):
self.jwt_token = jwt_token
self.chain = chain
self.ws_url = f"wss://api.axol.io/api/v1/mempool/{chain}/stream"
self.handlers: dict[str, list[Callable]] = {}
self._ws = None

async def connect(self):
"""Connect to mempool stream."""
url = f"{self.ws_url}?token={self.jwt_token}"
self._ws = await websockets.connect(url)
print(f"Connected to {self.chain} mempool")

async def subscribe(self, filters: dict = None):
"""Subscribe with optional filters."""
if filters:
await self._ws.send(json.dumps({
"type": "subscribe",
"filters": filters
}))

def on_tx(self, tx_type: str = "all"):
"""Decorator to register transaction handlers."""
def decorator(func: Callable):
if tx_type not in self.handlers:
self.handlers[tx_type] = []
self.handlers[tx_type].append(func)
return func
return decorator

async def _dispatch(self, tx: MempoolTx):
"""Dispatch transaction to registered handlers."""
tx_type = tx.classification.get("type", "unknown")

# Call specific handlers
if tx_type in self.handlers:
for handler in self.handlers[tx_type]:
await handler(tx)

# Call catch-all handlers
if "all" in self.handlers:
for handler in self.handlers["all"]:
await handler(tx)

async def stream(self):
"""Start streaming transactions."""
while True:
try:
message = await self._ws.recv()
data = json.loads(message)

if data.get("type") == "pending_transaction":
tx_data = data["data"]
tx = MempoolTx(
hash=tx_data["hash"],
from_addr=tx_data["from"],
to_addr=tx_data["to"],
value=int(tx_data["value"]),
gas_price=int(tx_data["gas_price"]),
classification=tx_data.get("classification", {}),
mev_potential=tx_data.get("mev_potential")
)
await self._dispatch(tx)

except websockets.ConnectionClosed:
print("Connection closed, reconnecting...")
await asyncio.sleep(1)
await self.connect()

async def close(self):
"""Close connection."""
if self._ws:
await self._ws.close()


# Usage
async def main():
stream = MempoolStream(
jwt_token="YOUR_JWT_TOKEN",
chain="ethereum"
)

@stream.on_tx("swap")
async def handle_swap(tx: MempoolTx):
"""Handle swap transactions."""
if tx.mev_potential:
profit = tx.mev_potential.get("estimated_profit_usd", 0)
if profit > 100:
print(f"High-value swap detected: {tx.hash}")
print(f" Estimated profit: ${profit:.2f}")
# Execute your MEV strategy here

@stream.on_tx("liquidation")
async def handle_liquidation(tx: MempoolTx):
"""Handle liquidation transactions."""
print(f"Liquidation detected: {tx.hash}")
protocol = tx.classification.get("protocol", "unknown")
print(f" Protocol: {protocol}")
# Check if you can beat this liquidation

await stream.connect()

# Filter for high-value swaps and liquidations
await stream.subscribe({
"min_value_eth": 0.5,
"tx_types": ["swap", "liquidation"],
"include_mev_only": True
})

await stream.stream()

asyncio.run(main())

TypeScript - Real-time Monitor

interface MempoolTx {
hash: string;
from: string;
to: string;
value: string;
gas_price: string;
gas_limit: number;
classification: {
type: string;
protocol?: string;
tokens?: string[];
};
mev_potential?: {
type: string;
estimated_profit_usd: number;
confidence: number;
};
}

interface StreamFilters {
min_value_eth?: number;
tx_types?: string[];
protocols?: string[];
to_addresses?: string[];
include_mev_only?: boolean;
}

type TxHandler = (tx: MempoolTx) => void | Promise<void>;

class MempoolMonitor {
private ws: WebSocket | null = null;
private handlers: Map<string, TxHandler[]> = new Map();
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;

constructor(
private jwtToken: string,
private chain: string = 'ethereum'
) {}

async connect(): Promise<void> {
const url = `wss://api.axol.io/api/v1/mempool/${this.chain}/stream?token=${this.jwtToken}`;

return new Promise((resolve, reject) => {
this.ws = new WebSocket(url);

this.ws.onopen = () => {
console.log(`Connected to ${this.chain} mempool`);
this.reconnectAttempts = 0;
resolve();
};

this.ws.onclose = () => {
console.log('Connection closed');
this.handleReconnect();
};

this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
reject(error);
};

this.ws.onmessage = (event) => {
this.handleMessage(JSON.parse(event.data));
};
});
}

private async handleReconnect(): Promise<void> {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.pow(2, this.reconnectAttempts) * 1000;
console.log(`Reconnecting in ${delay}ms...`);
await new Promise(r => setTimeout(r, delay));
await this.connect();
}
}

subscribe(filters?: StreamFilters): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'subscribe',
filters
}));
}
}

onTransaction(type: string, handler: TxHandler): void {
if (!this.handlers.has(type)) {
this.handlers.set(type, []);
}
this.handlers.get(type)!.push(handler);
}

private handleMessage(message: { type: string; data: MempoolTx }): void {
if (message.type !== 'pending_transaction') return;

const tx = message.data;
const txType = tx.classification?.type || 'unknown';

// Call specific handlers
const handlers = this.handlers.get(txType) || [];
handlers.forEach(handler => handler(tx));

// Call catch-all handlers
const allHandlers = this.handlers.get('all') || [];
allHandlers.forEach(handler => handler(tx));
}

close(): void {
this.ws?.close();
}
}

// Usage
async function main() {
const monitor = new MempoolMonitor(
process.env.JWT_TOKEN!,
'ethereum'
);

// Track statistics
const stats = {
total: 0,
swaps: 0,
liquidations: 0,
mevOpportunities: 0,
totalProfitUsd: 0
};

// Handle swaps
monitor.onTransaction('swap', (tx) => {
stats.swaps++;

if (tx.mev_potential) {
stats.mevOpportunities++;
stats.totalProfitUsd += tx.mev_potential.estimated_profit_usd;

console.log(`\n[SWAP] ${tx.hash.slice(0, 10)}...`);
console.log(` Protocol: ${tx.classification.protocol}`);
console.log(` Tokens: ${tx.classification.tokens?.join(' -> ')}`);
console.log(` MEV Type: ${tx.mev_potential.type}`);
console.log(` Est. Profit: $${tx.mev_potential.estimated_profit_usd.toFixed(2)}`);
}
});

// Handle liquidations
monitor.onTransaction('liquidation', (tx) => {
stats.liquidations++;
console.log(`\n[LIQUIDATION] ${tx.hash.slice(0, 10)}...`);
console.log(` Protocol: ${tx.classification.protocol}`);
});

// Track all transactions
monitor.onTransaction('all', () => {
stats.total++;
if (stats.total % 100 === 0) {
console.log(`\n--- Stats: ${stats.total} txs, ${stats.swaps} swaps, ${stats.liquidations} liquidations ---`);
}
});

await monitor.connect();

// Subscribe with filters
monitor.subscribe({
tx_types: ['swap', 'liquidation'],
include_mev_only: true
});

console.log('Monitoring mempool...');

// Run for 10 minutes then print final stats
setTimeout(() => {
console.log('\n=== Final Statistics ===');
console.log(`Total transactions: ${stats.total}`);
console.log(`Swaps: ${stats.swaps}`);
console.log(`Liquidations: ${stats.liquidations}`);
console.log(`MEV Opportunities: ${stats.mevOpportunities}`);
console.log(`Total Est. Profit: $${stats.totalProfitUsd.toFixed(2)}`);
monitor.close();
}, 10 * 60 * 1000);
}

main().catch(console.error);

Best Practices

1. Use Filters to Reduce Volume

Unfiltered mempool streams can be overwhelming:

# Bad: Receive everything
await stream.subscribe({})

# Good: Filter for relevant transactions
await stream.subscribe({
"min_value_eth": 0.1,
"tx_types": ["swap", "liquidation"],
"protocols": ["uniswap_v3"]
})

2. Handle Reconnection

Always implement reconnection logic:

async def stream_with_reconnect():
while True:
try:
await stream.connect()
await stream.stream()
except Exception as e:
print(f"Error: {e}, reconnecting...")
await asyncio.sleep(5)

3. Process Asynchronously

Don't block the message loop:

@stream.on_tx("swap")
async def handle_swap(tx):
# Spawn task for heavy processing
asyncio.create_task(process_swap(tx))

4. Monitor Your CU Usage

Mempool streaming consumes CUs quickly:

Filter LevelApprox. Events/secCUs/hour
Unfiltered100-50014.4M-72M
Swaps only10-501.4M-7.2M
Large swaps (>1 ETH)1-5144K-720K
MEV opportunities0.1-114K-144K

See Also