版本: v1.0.0
日期: 2025-10-09
负责人: Execution Layer Team
依赖文档: M16_INCREMENTAL_GRID_DESIGN.md, MODULE_INTERFACES.md
从实盘日志分析发现的速率限制问题:
初始化 (20 订单):
- 耗时: 1093ms
- 平均每单: 55ms
- 状态: ✅ 正常
第 1 次调整 (20 订单):
- 耗时: 58,241ms (~58s)
- 平均每单: 2912ms
- 状态: ⚠️ 限流警告
第 2 次调整 (20 订单):
- 耗时: 118,668ms (~119s)
- 平均每单: 5933ms
- 状态: 🚫 严重限流
第 3 次调整 (20 订单):
- 耗时: 178,227ms (~178s)
- 平均每单: 8911ms
- 状态: 🚫 极度限流
根本原因分析:
估算参数:
- Burst capacity: ~10 requests
- Refill rate: ~5 requests/second
- Penalty window: ~60 seconds
- Penalty multiplier: 每超 1 个请求 +500ms 延迟
模拟计算:
第 1 批 20 个请求同时发送:
- 前 10 个: 正常处理 (~100ms each)
- 后 10 个: 触发限流
- 第 11 个: +500ms delay
- 第 12 个: +1000ms delay
- ...
- 第 20 个: +5000ms delay
平均延迟: (10×100 + 10×2750) / 20 = 1425ms per request
总耗时: ~28s
┌──────────────────┐
│ Token Bucket │
│ Capacity: 10 │ ← burst 容量
│ Tokens: 7 │ ← 当前令牌数
│ Refill: 5/s │ ← 补充速率
└──────────────────┘
↑
│ refill
│
┌────┴────┐
│ Timer │
└─────────┘
每个请求消耗 1 token:
- 有 token → 立即发送
- 无 token → 进入队列等待
class TokenBucket {
private tokens: number;
private lastRefill: number;
constructor(
private readonly capacity: number, // burst capacity
private readonly refillRate: number // tokens per second
) {
this.tokens = capacity;
this.lastRefill = Date.now();
}
// 尝试消耗 token
tryConsume(count: number = 1): boolean {
this.refill();
if (this.tokens >= count) {
this.tokens -= count;
return true;
}
return false;
}
// 按时间补充 token
private refill(): void {
const now = Date.now();
const elapsedMs = now - this.lastRefill;
const elapsedSec = elapsedMs / 1000;
const tokensToAdd = elapsedSec * this.refillRate;
this.tokens = Math.min(this.capacity, this.tokens + tokensToAdd);
this.lastRefill = now;
}
// 获取当前令牌数
available(): number {
this.refill();
return Math.floor(this.tokens);
}
// 预估等待时间
waitTimeMs(count: number = 1): number {
this.refill();
if (this.tokens >= count) {
return 0;
}
const needed = count - this.tokens;
return (needed / this.refillRate) * 1000;
}
}
┌──────────────┐
│ GridMaker │
└──────┬───────┘
│ sendOrder()
↓
┌──────────────────────────┐
│ ThrottledOrderGateway │
│ │
│ ┌────────────────┐ │
│ │ Token Bucket │ │
│ └────────────────┘ │
│ ↓ │
│ ┌────────────────┐ │
│ │ Request Queue │ │
│ │ [order1, ...] │ │
│ └────────────────┘ │
│ ↓ │
│ ┌────────────────┐ │
│ │ Batch Sender │ │
│ └────────────────┘ │
└──────────┬───────────────┘
│
↓
┌─────────────┐
│ WS Client │
└─────────────┘
interface ThrottleConfig {
burst: number; // 最大突发量
refillPerSecond: number; // 每秒补充速率
maxQueueDepth: number; // 最大队列深度
queueTimeoutMs: number; // 队列超时
batchSize?: number; // 批次大小(可选)
batchIntervalMs?: number; // 批次间隔(可选)
}
class ThrottledOrderGateway {
private tokenBucket: TokenBucket;
private queue: QueuedOrder[] = [];
private processing = false;
constructor(
private wsClient: WebSocketClient,
private config: ThrottleConfig,
private logger: Logger
) {
this.tokenBucket = new TokenBucket(
config.burst,
config.refillPerSecond
);
// 启动后台处理循环
this.startProcessingLoop();
}
async sendOrder(order: Order): Promise<string> {
// 检查队列深度
if (this.queue.length >= this.config.maxQueueDepth) {
throw new Error(`Queue depth exceeded: ${this.queue.length}`);
}
// 创建 Promise + 入队
return new Promise((resolve, reject) => {
const queued: QueuedOrder = {
order,
resolve,
reject,
enqueuedAt: Date.now()
};
this.queue.push(queued);
// 立即尝试处理
setImmediate(() => this.processQueue());
});
}
private async processQueue(): Promise<void> {
if (this.processing || this.queue.length === 0) {
return;
}
this.processing = true;
try {
while (this.queue.length > 0) {
// 检查超时
this.pruneTimedOutOrders();
// 尝试获取 token
if (!this.tokenBucket.tryConsume(1)) {
const waitMs = this.tokenBucket.waitTimeMs(1);
this.logger.debug({
queueDepth: this.queue.length,
waitMs: Math.ceil(waitMs)
}, 'Throttle: waiting for token');
// 等待 token 补充
await sleep(Math.min(waitMs, 100));
continue;
}
// 取出订单
const queued = this.queue.shift()!;
// 异步发送(不阻塞)
this.sendOrderAsync(queued);
// 批次间隔
if (this.config.batchIntervalMs) {
await sleep(this.config.batchIntervalMs);
}
}
} finally {
this.processing = false;
}
}
private async sendOrderAsync(queued: QueuedOrder): Promise<void> {
try {
const result = await this.wsClient.send({
method: 'create_order',
params: queued.order
});
queued.resolve(result.orderId);
this.logger.debug({
orderId: result.orderId,
queuedMs: Date.now() - queued.enqueuedAt
}, 'Order sent successfully');
} catch (error) {
queued.reject(error);
this.logger.error({
error,
queuedMs: Date.now() - queued.enqueuedAt
}, 'Order send failed');
}
}
private pruneTimedOutOrders(): void {
const now = Date.now();
const timeout = this.config.queueTimeoutMs;
while (this.queue.length > 0) {
const queued = this.queue[0];
const age = now - queued.enqueuedAt;
if (age > timeout) {
this.queue.shift();
queued.reject(new Error(`Queue timeout after ${age}ms`));
this.logger.warn({ age }, 'Order timed out in queue');
} else {
break; // 队列按时间排序
}
}
}
private startProcessingLoop(): void {
setInterval(() => {
if (this.queue.length > 0 && !this.processing) {
this.processQueue();
}
}, 50); // 每 50ms 检查一次
}
// 获取状态
getStatus() {
return {
queueDepth: this.queue.length,
tokensAvailable: this.tokenBucket.available(),
processing: this.processing
};
}
}
interface QueuedOrder {
order: Order;
resolve: (orderId: string) => void;
reject: (error: Error) => void;
enqueuedAt: number;
}
class MultiAccountThrottleManager {
private gateways = new Map<string, ThrottledOrderGateway>();
constructor(
private wsClients: Map<string, WebSocketClient>,
private configs: Map<string, ThrottleConfig>
) {
// 为每个账户创建独立 gateway
for (const [accountId, wsClient] of wsClients) {
const config = configs.get(accountId) ?? this.getDefaultConfig();
this.gateways.set(
accountId,
new ThrottledOrderGateway(wsClient, config, logger)
);
}
}
async sendOrder(order: Order): Promise<string> {
const accountId = order.accountId ?? 'default';
const gateway = this.gateways.get(accountId);
if (!gateway) {
throw new Error(`No gateway for account: ${accountId}`);
}
return gateway.sendOrder(order);
}
private getDefaultConfig(): ThrottleConfig {
return {
burst: 10,
refillPerSecond: 5,
maxQueueDepth: 50,
queueTimeoutMs: 5000
};
}
}
execution:
throttle:
# 默认配置
default:
burst: 10
refill_per_second: 5
max_queue_depth: 50
queue_timeout_ms: 5000
# 内圈账户(aggressive)
inner_maker:
burst: 15 # 更大 burst
refill_per_second: 8 # 更快补充
max_queue_depth: 100
queue_timeout_ms: 3000
# 外圈账户(conservative)
outer_maker:
burst: 8
refill_per_second: 4
max_queue_depth: 30
queue_timeout_ms: 10000
# 对冲账户(critical)
hedger:
burst: 20 # 最大优先级
refill_per_second: 10
max_queue_depth: 5 # 队列小,快速失败
queue_timeout_ms: 2000
export const throttleMetrics = {
// 令牌桶状态
tokens_available: new Gauge({
name: 'throttle_tokens_available',
help: 'Current tokens in bucket',
labelNames: ['account_id']
}),
// 队列状态
queue_depth: new Gauge({
name: 'throttle_queue_depth',
help: 'Current queue depth',
labelNames: ['account_id']
}),
queue_depth_max: new Gauge({
name: 'throttle_queue_depth_max',
help: 'Maximum queue depth observed',
labelNames: ['account_id']
}),
// 延迟统计
queue_wait_time_ms: new Histogram({
name: 'throttle_queue_wait_time_ms',
help: 'Time spent in queue',
labelNames: ['account_id'],
buckets: [10, 50, 100, 500, 1000, 2000, 5000]
}),
// 超时与拒绝
queue_timeout_total: new Counter({
name: 'throttle_queue_timeout_total',
help: 'Total orders timed out in queue',
labelNames: ['account_id']
}),
queue_rejected_total: new Counter({
name: 'throttle_queue_rejected_total',
help: 'Total orders rejected (queue full)',
labelNames: ['account_id']
}),
// 吞吐量
orders_sent_total: new Counter({
name: 'throttle_orders_sent_total',
help: 'Total orders sent through throttle',
labelNames: ['account_id', 'status']
}),
send_rate_per_second: new Gauge({
name: 'throttle_send_rate_per_second',
help: 'Current order send rate',
labelNames: ['account_id']
})
};
{
"dashboard": {
"title": "Order Throttling",
"panels": [
{
"title": "Queue Depth by Account",
"targets": [{
"expr": "throttle_queue_depth"
}],
"type": "graph"
},
{
"title": "Tokens Available",
"targets": [{
"expr": "throttle_tokens_available"
}],
"type": "graph"
},
{
"title": "Queue Wait Time (p95)",
"targets": [{
"expr": "histogram_quantile(0.95, rate(throttle_queue_wait_time_ms_bucket[5m]))"
}],
"type": "graph"
},
{
"title": "Send Rate",
"targets": [{
"expr": "rate(throttle_orders_sent_total[1m])"
}],
"type": "graph"
}
]
}
}
groups:
- name: throttling
interval: 30s
rules:
- alert: ThrottleQueueDepthHigh
expr: throttle_queue_depth > 30
for: 2m
labels:
severity: warning
annotations:
summary: "Throttle queue depth high for {{ $labels.account_id }}"
description: "Current depth: {{ $value }}"
- alert: ThrottleQueueTimeout
expr: rate(throttle_queue_timeout_total[5m]) > 0.1
for: 1m
labels:
severity: critical
annotations:
summary: "Orders timing out in throttle queue"
description: "{{ $value }} timeouts per second"
- alert: ThrottleQueueRejected
expr: rate(throttle_queue_rejected_total[5m]) > 0
for: 30s
labels:
severity: critical
annotations:
summary: "Orders rejected due to full queue"
description: "Queue capacity exceeded"
- alert: ThrottleWaitTimeHigh
expr: histogram_quantile(0.95, rate(throttle_queue_wait_time_ms_bucket[5m])) > 2000
for: 3m
labels:
severity: warning
annotations:
summary: "High queue wait time (p95 > 2s)"
description: "p95 wait: {{ $value }}ms"
describe('TokenBucket', () => {
it('should allow burst up to capacity', () => {
const bucket = new TokenBucket(10, 5);
for (let i = 0; i < 10; i++) {
expect(bucket.tryConsume(1)).toBe(true);
}
expect(bucket.tryConsume(1)).toBe(false);
});
it('should refill over time', async () => {
const bucket = new TokenBucket(10, 10); // 10 tokens/sec
// 消耗所有 token
for (let i = 0; i < 10; i++) {
bucket.tryConsume(1);
}
expect(bucket.tryConsume(1)).toBe(false);
// 等待 0.5 秒 → 应补充 5 tokens
await sleep(500);
expect(bucket.available()).toBe(5);
});
});
describe('ThrottledOrderGateway', () => {
it('should queue orders when tokens exhausted', async () => {
const gateway = new ThrottledOrderGateway(
mockWsClient,
{ burst: 5, refillPerSecond: 2, maxQueueDepth: 20, queueTimeoutMs: 5000 },
logger
);
// 发送 10 个订单(超过 burst)
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(gateway.sendOrder({ symbol: 'BTC', ... }));
}
// 前 5 个应立即发送
await sleep(100);
expect(gateway.getStatus().queueDepth).toBe(5);
// 等待 token 补充
await Promise.all(promises);
expect(gateway.getStatus().queueDepth).toBe(0);
});
it('should timeout old orders', async () => {
const gateway = new ThrottledOrderGateway(
mockWsClient,
{ burst: 0, refillPerSecond: 0, maxQueueDepth: 10, queueTimeoutMs: 1000 },
logger
);
const promise = gateway.sendOrder({ symbol: 'BTC', ... });
await expect(promise).rejects.toThrow(/timeout/i);
});
});
describe('ThrottledOrderGateway stress test', () => {
it('should handle 100 concurrent orders gracefully', async () => {
const gateway = new ThrottledOrderGateway(
mockWsClient,
{ burst: 10, refillPerSecond: 20, maxQueueDepth: 100, queueTimeoutMs: 10000 },
logger
);
const start = Date.now();
const promises = [];
for (let i = 0; i < 100; i++) {
promises.push(gateway.sendOrder({ symbol: 'BTC', ... }));
}
const results = await Promise.allSettled(promises);
const elapsed = Date.now() - start;
const succeeded = results.filter(r => r.status === 'fulfilled').length;
const failed = results.filter(r => r.status === 'rejected').length;
console.log({
total: 100,
succeeded,
failed,
elapsedMs: elapsed,
avgMs: elapsed / 100
});
// 预期: 100 orders / 20 per sec = 5 seconds
expect(elapsed).toBeGreaterThan(4000);
expect(elapsed).toBeLessThan(6000);
expect(succeeded).toBe(100);
});
});
throttle:
inner_maker:
burst: 20 # 允许快速布局
refill_per_second: 5 # 稳态速率
throttle:
hedger:
burst: 5
refill_per_second: 10 # 高补充速率
queue_timeout_ms: 1000 # 快速失败
throttle:
inner_maker:
burst: 8
refill_per_second: 3 # 匹配批次间隔
batch_interval_ms: 333 # ≈ 1 / refill_per_second
可能原因:
1. refill_per_second 设置过低
2. 交易所实际限流比配置更严
3. 网络延迟导致积压
排查步骤:
1. 检查 send_rate_per_second 实际值
2. 调高 refill_per_second 或降低 burst
3. 检查 WebSocket 连接质量
可能原因:
1. queue_timeout_ms 设置过短
2. burst + refill 配置不合理
3. 队列深度不够
排查步骤:
1. 增加 queue_timeout_ms
2. 增加 max_queue_depth
3. 降低订单发送速率
# 临时禁用所有节流(紧急情况)
curl -X POST http://localhost:3000/api/throttle/disable
# 恢复默认配置
curl -X POST http://localhost:3000/api/throttle/reset
# 动态调整 refill rate
curl -X POST http://localhost:3000/api/throttle/config \
-d '{"account_id": "inner_maker", "refill_per_second": 10}'
根据实际 RTT 和成功率动态调整参数:
class AdaptiveThrottle extends ThrottledOrderGateway {
private recentLatencies: number[] = [];
async sendOrderAsync(queued: QueuedOrder): Promise<void> {
const start = Date.now();
try {
await super.sendOrderAsync(queued);
const latency = Date.now() - start;
this.recentLatencies.push(latency);
this.adjustRateIfNeeded();
} catch (error) {
// 失败可能是限流信号
this.decreaseRate();
throw error;
}
}
private adjustRateIfNeeded(): void {
const p95 = percentile(this.recentLatencies, 0.95);
if (p95 > 2000) {
// 延迟过高 → 降速
this.config.refillPerSecond *= 0.8;
} else if (p95 < 500) {
// 延迟正常 → 尝试加速
this.config.refillPerSecond *= 1.1;
}
// 限制范围
this.config.refillPerSecond = clamp(
this.config.refillPerSecond,
1,
20
);
}
}
多实例场景下的全局协调:
class DistributedThrottle {
constructor(
private redis: Redis,
private accountId: string,
private config: ThrottleConfig
) {}
async tryConsume(count: number): Promise<boolean> {
const key = `throttle:${this.accountId}`;
// Lua 脚本保证原子性
const result = await this.redis.eval(`
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('HGETALL', key)
-- ... token bucket logic ...
return tokens >= requested
`, 1, key, this.config.burst, this.config.refillPerSecond, count, Date.now());
return result === 1;
}
}
packages/execution/src/throttledOrderGateway.ts # 核心实现
packages/execution/src/tokenBucket.ts # 令牌桶
packages/telemetry/src/throttleMetrics.ts # 监控指标
config/throttle.yaml # 配置示例