M16_PLACEMENT_THROTTLING_DESIGN.md 20 KB

M1.6 Placement Throttling 2.0 设计文档

版本: v1.0.0 日期: 2025-10-09 负责人: Execution Layer Team 依赖文档: M16_INCREMENTAL_GRID_DESIGN.md, MODULE_INTERFACES.md


1. 背景与问题

1.1 当前限流问题

从实盘日志分析发现的速率限制问题:

初始化 (20 订单):
  - 耗时: 1093ms
  - 平均每单: 55ms
  - 状态: ✅ 正常

第 1 次调整 (20 订单):
  - 耗时: 58,241ms (~58s)
  - 平均每单: 2912ms
  - 状态: ⚠️ 限流警告

第 2 次调整 (20 订单):
  - 耗时: 118,668ms (~119s)
  - 平均每单: 5933ms
  - 状态: 🚫 严重限流

第 3 次调整 (20 订单):
  - 耗时: 178,227ms (~178s)
  - 平均每单: 8911ms
  - 状态: 🚫 极度限流

根本原因分析:

  1. 无节流的并发 burst: 20 个订单同时发送
  2. 触发交易所限流惩罚: 每个请求被延迟处理
  3. 惩罚累积: 后续请求延迟越来越严重
  4. 恶性循环: 重试加剧拥塞

1.2 交易所限流策略 (推测)

估算参数:
- Burst capacity: ~10 requests
- Refill rate: ~5 requests/second
- Penalty window: ~60 seconds
- Penalty multiplier: 每超 1 个请求 +500ms 延迟

模拟计算:

第 1 批 20 个请求同时发送:
  - 前 10 个: 正常处理 (~100ms each)
  - 后 10 个: 触发限流
    - 第 11 个: +500ms delay
    - 第 12 个: +1000ms delay
    - ...
    - 第 20 个: +5000ms delay

平均延迟: (10×100 + 10×2750) / 20 = 1425ms per request
总耗时: ~28s

2. 解决方案设计

2.1 令牌桶算法 (Token Bucket)

2.1.1 基本原理

┌──────────────────┐
│  Token Bucket    │
│  Capacity: 10    │  ← burst 容量
│  Tokens: 7       │  ← 当前令牌数
│  Refill: 5/s     │  ← 补充速率
└──────────────────┘
        ↑
        │ refill
        │
   ┌────┴────┐
   │  Timer  │
   └─────────┘

每个请求消耗 1 token:
- 有 token → 立即发送
- 无 token → 进入队列等待

2.1.2 核心实现

class TokenBucket {
  private tokens: number;
  private lastRefill: number;

  constructor(
    private readonly capacity: number,     // burst capacity
    private readonly refillRate: number    // tokens per second
  ) {
    this.tokens = capacity;
    this.lastRefill = Date.now();
  }

  // 尝试消耗 token
  tryConsume(count: number = 1): boolean {
    this.refill();

    if (this.tokens >= count) {
      this.tokens -= count;
      return true;
    }
    return false;
  }

  // 按时间补充 token
  private refill(): void {
    const now = Date.now();
    const elapsedMs = now - this.lastRefill;
    const elapsedSec = elapsedMs / 1000;

    const tokensToAdd = elapsedSec * this.refillRate;
    this.tokens = Math.min(this.capacity, this.tokens + tokensToAdd);
    this.lastRefill = now;
  }

  // 获取当前令牌数
  available(): number {
    this.refill();
    return Math.floor(this.tokens);
  }

  // 预估等待时间
  waitTimeMs(count: number = 1): number {
    this.refill();

    if (this.tokens >= count) {
      return 0;
    }

    const needed = count - this.tokens;
    return (needed / this.refillRate) * 1000;
  }
}

2.2 ThrottledOrderGateway 设计

2.2.1 架构

┌──────────────┐
│   GridMaker  │
└──────┬───────┘
       │ sendOrder()
       ↓
┌──────────────────────────┐
│ ThrottledOrderGateway    │
│                          │
│  ┌────────────────┐      │
│  │  Token Bucket  │      │
│  └────────────────┘      │
│          ↓               │
│  ┌────────────────┐      │
│  │  Request Queue │      │
│  │  [order1, ...]  │     │
│  └────────────────┘      │
│          ↓               │
│  ┌────────────────┐      │
│  │  Batch Sender  │      │
│  └────────────────┘      │
└──────────┬───────────────┘
           │
           ↓
    ┌─────────────┐
    │  WS Client  │
    └─────────────┘

2.2.2 实现

interface ThrottleConfig {
  burst: number;               // 最大突发量
  refillPerSecond: number;     // 每秒补充速率
  maxQueueDepth: number;       // 最大队列深度
  queueTimeoutMs: number;      // 队列超时
  batchSize?: number;          // 批次大小(可选)
  batchIntervalMs?: number;    // 批次间隔(可选)
}

class ThrottledOrderGateway {
  private tokenBucket: TokenBucket;
  private queue: QueuedOrder[] = [];
  private processing = false;

  constructor(
    private wsClient: WebSocketClient,
    private config: ThrottleConfig,
    private logger: Logger
  ) {
    this.tokenBucket = new TokenBucket(
      config.burst,
      config.refillPerSecond
    );

    // 启动后台处理循环
    this.startProcessingLoop();
  }

  async sendOrder(order: Order): Promise<string> {
    // 检查队列深度
    if (this.queue.length >= this.config.maxQueueDepth) {
      throw new Error(`Queue depth exceeded: ${this.queue.length}`);
    }

    // 创建 Promise + 入队
    return new Promise((resolve, reject) => {
      const queued: QueuedOrder = {
        order,
        resolve,
        reject,
        enqueuedAt: Date.now()
      };

      this.queue.push(queued);

      // 立即尝试处理
      setImmediate(() => this.processQueue());
    });
  }

  private async processQueue(): Promise<void> {
    if (this.processing || this.queue.length === 0) {
      return;
    }

    this.processing = true;

    try {
      while (this.queue.length > 0) {
        // 检查超时
        this.pruneTimedOutOrders();

        // 尝试获取 token
        if (!this.tokenBucket.tryConsume(1)) {
          const waitMs = this.tokenBucket.waitTimeMs(1);

          this.logger.debug({
            queueDepth: this.queue.length,
            waitMs: Math.ceil(waitMs)
          }, 'Throttle: waiting for token');

          // 等待 token 补充
          await sleep(Math.min(waitMs, 100));
          continue;
        }

        // 取出订单
        const queued = this.queue.shift()!;

        // 异步发送(不阻塞)
        this.sendOrderAsync(queued);

        // 批次间隔
        if (this.config.batchIntervalMs) {
          await sleep(this.config.batchIntervalMs);
        }
      }
    } finally {
      this.processing = false;
    }
  }

  private async sendOrderAsync(queued: QueuedOrder): Promise<void> {
    try {
      const result = await this.wsClient.send({
        method: 'create_order',
        params: queued.order
      });

      queued.resolve(result.orderId);

      this.logger.debug({
        orderId: result.orderId,
        queuedMs: Date.now() - queued.enqueuedAt
      }, 'Order sent successfully');
    } catch (error) {
      queued.reject(error);

      this.logger.error({
        error,
        queuedMs: Date.now() - queued.enqueuedAt
      }, 'Order send failed');
    }
  }

  private pruneTimedOutOrders(): void {
    const now = Date.now();
    const timeout = this.config.queueTimeoutMs;

    while (this.queue.length > 0) {
      const queued = this.queue[0];
      const age = now - queued.enqueuedAt;

      if (age > timeout) {
        this.queue.shift();
        queued.reject(new Error(`Queue timeout after ${age}ms`));

        this.logger.warn({ age }, 'Order timed out in queue');
      } else {
        break; // 队列按时间排序
      }
    }
  }

  private startProcessingLoop(): void {
    setInterval(() => {
      if (this.queue.length > 0 && !this.processing) {
        this.processQueue();
      }
    }, 50); // 每 50ms 检查一次
  }

  // 获取状态
  getStatus() {
    return {
      queueDepth: this.queue.length,
      tokensAvailable: this.tokenBucket.available(),
      processing: this.processing
    };
  }
}

interface QueuedOrder {
  order: Order;
  resolve: (orderId: string) => void;
  reject: (error: Error) => void;
  enqueuedAt: number;
}

2.3 分层节流策略

2.3.1 多账户独立限流

class MultiAccountThrottleManager {
  private gateways = new Map<string, ThrottledOrderGateway>();

  constructor(
    private wsClients: Map<string, WebSocketClient>,
    private configs: Map<string, ThrottleConfig>
  ) {
    // 为每个账户创建独立 gateway
    for (const [accountId, wsClient] of wsClients) {
      const config = configs.get(accountId) ?? this.getDefaultConfig();

      this.gateways.set(
        accountId,
        new ThrottledOrderGateway(wsClient, config, logger)
      );
    }
  }

  async sendOrder(order: Order): Promise<string> {
    const accountId = order.accountId ?? 'default';
    const gateway = this.gateways.get(accountId);

    if (!gateway) {
      throw new Error(`No gateway for account: ${accountId}`);
    }

    return gateway.sendOrder(order);
  }

  private getDefaultConfig(): ThrottleConfig {
    return {
      burst: 10,
      refillPerSecond: 5,
      maxQueueDepth: 50,
      queueTimeoutMs: 5000
    };
  }
}

2.3.2 内外圈差异化配置

execution:
  throttle:
    # 默认配置
    default:
      burst: 10
      refill_per_second: 5
      max_queue_depth: 50
      queue_timeout_ms: 5000

    # 内圈账户(aggressive)
    inner_maker:
      burst: 15             # 更大 burst
      refill_per_second: 8  # 更快补充
      max_queue_depth: 100
      queue_timeout_ms: 3000

    # 外圈账户(conservative)
    outer_maker:
      burst: 8
      refill_per_second: 4
      max_queue_depth: 30
      queue_timeout_ms: 10000

    # 对冲账户(critical)
    hedger:
      burst: 20             # 最大优先级
      refill_per_second: 10
      max_queue_depth: 5    # 队列小,快速失败
      queue_timeout_ms: 2000

3. 监控与告警

3.1 Prometheus 指标

export const throttleMetrics = {
  // 令牌桶状态
  tokens_available: new Gauge({
    name: 'throttle_tokens_available',
    help: 'Current tokens in bucket',
    labelNames: ['account_id']
  }),

  // 队列状态
  queue_depth: new Gauge({
    name: 'throttle_queue_depth',
    help: 'Current queue depth',
    labelNames: ['account_id']
  }),

  queue_depth_max: new Gauge({
    name: 'throttle_queue_depth_max',
    help: 'Maximum queue depth observed',
    labelNames: ['account_id']
  }),

  // 延迟统计
  queue_wait_time_ms: new Histogram({
    name: 'throttle_queue_wait_time_ms',
    help: 'Time spent in queue',
    labelNames: ['account_id'],
    buckets: [10, 50, 100, 500, 1000, 2000, 5000]
  }),

  // 超时与拒绝
  queue_timeout_total: new Counter({
    name: 'throttle_queue_timeout_total',
    help: 'Total orders timed out in queue',
    labelNames: ['account_id']
  }),

  queue_rejected_total: new Counter({
    name: 'throttle_queue_rejected_total',
    help: 'Total orders rejected (queue full)',
    labelNames: ['account_id']
  }),

  // 吞吐量
  orders_sent_total: new Counter({
    name: 'throttle_orders_sent_total',
    help: 'Total orders sent through throttle',
    labelNames: ['account_id', 'status']
  }),

  send_rate_per_second: new Gauge({
    name: 'throttle_send_rate_per_second',
    help: 'Current order send rate',
    labelNames: ['account_id']
  })
};

3.2 Grafana Dashboard

{
  "dashboard": {
    "title": "Order Throttling",
    "panels": [
      {
        "title": "Queue Depth by Account",
        "targets": [{
          "expr": "throttle_queue_depth"
        }],
        "type": "graph"
      },
      {
        "title": "Tokens Available",
        "targets": [{
          "expr": "throttle_tokens_available"
        }],
        "type": "graph"
      },
      {
        "title": "Queue Wait Time (p95)",
        "targets": [{
          "expr": "histogram_quantile(0.95, rate(throttle_queue_wait_time_ms_bucket[5m]))"
        }],
        "type": "graph"
      },
      {
        "title": "Send Rate",
        "targets": [{
          "expr": "rate(throttle_orders_sent_total[1m])"
        }],
        "type": "graph"
      }
    ]
  }
}

3.3 告警规则

groups:
  - name: throttling
    interval: 30s
    rules:
      - alert: ThrottleQueueDepthHigh
        expr: throttle_queue_depth > 30
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "Throttle queue depth high for {{ $labels.account_id }}"
          description: "Current depth: {{ $value }}"

      - alert: ThrottleQueueTimeout
        expr: rate(throttle_queue_timeout_total[5m]) > 0.1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Orders timing out in throttle queue"
          description: "{{ $value }} timeouts per second"

      - alert: ThrottleQueueRejected
        expr: rate(throttle_queue_rejected_total[5m]) > 0
        for: 30s
        labels:
          severity: critical
        annotations:
          summary: "Orders rejected due to full queue"
          description: "Queue capacity exceeded"

      - alert: ThrottleWaitTimeHigh
        expr: histogram_quantile(0.95, rate(throttle_queue_wait_time_ms_bucket[5m])) > 2000
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "High queue wait time (p95 > 2s)"
          description: "p95 wait: {{ $value }}ms"

4. 测试与验证

4.1 单元测试

describe('TokenBucket', () => {
  it('should allow burst up to capacity', () => {
    const bucket = new TokenBucket(10, 5);

    for (let i = 0; i < 10; i++) {
      expect(bucket.tryConsume(1)).toBe(true);
    }

    expect(bucket.tryConsume(1)).toBe(false);
  });

  it('should refill over time', async () => {
    const bucket = new TokenBucket(10, 10); // 10 tokens/sec

    // 消耗所有 token
    for (let i = 0; i < 10; i++) {
      bucket.tryConsume(1);
    }

    expect(bucket.tryConsume(1)).toBe(false);

    // 等待 0.5 秒 → 应补充 5 tokens
    await sleep(500);
    expect(bucket.available()).toBe(5);
  });
});

describe('ThrottledOrderGateway', () => {
  it('should queue orders when tokens exhausted', async () => {
    const gateway = new ThrottledOrderGateway(
      mockWsClient,
      { burst: 5, refillPerSecond: 2, maxQueueDepth: 20, queueTimeoutMs: 5000 },
      logger
    );

    // 发送 10 个订单(超过 burst)
    const promises = [];
    for (let i = 0; i < 10; i++) {
      promises.push(gateway.sendOrder({ symbol: 'BTC', ... }));
    }

    // 前 5 个应立即发送
    await sleep(100);
    expect(gateway.getStatus().queueDepth).toBe(5);

    // 等待 token 补充
    await Promise.all(promises);
    expect(gateway.getStatus().queueDepth).toBe(0);
  });

  it('should timeout old orders', async () => {
    const gateway = new ThrottledOrderGateway(
      mockWsClient,
      { burst: 0, refillPerSecond: 0, maxQueueDepth: 10, queueTimeoutMs: 1000 },
      logger
    );

    const promise = gateway.sendOrder({ symbol: 'BTC', ... });

    await expect(promise).rejects.toThrow(/timeout/i);
  });
});

4.2 压力测试

describe('ThrottledOrderGateway stress test', () => {
  it('should handle 100 concurrent orders gracefully', async () => {
    const gateway = new ThrottledOrderGateway(
      mockWsClient,
      { burst: 10, refillPerSecond: 20, maxQueueDepth: 100, queueTimeoutMs: 10000 },
      logger
    );

    const start = Date.now();
    const promises = [];

    for (let i = 0; i < 100; i++) {
      promises.push(gateway.sendOrder({ symbol: 'BTC', ... }));
    }

    const results = await Promise.allSettled(promises);
    const elapsed = Date.now() - start;

    const succeeded = results.filter(r => r.status === 'fulfilled').length;
    const failed = results.filter(r => r.status === 'rejected').length;

    console.log({
      total: 100,
      succeeded,
      failed,
      elapsedMs: elapsed,
      avgMs: elapsed / 100
    });

    // 预期: 100 orders / 20 per sec = 5 seconds
    expect(elapsed).toBeGreaterThan(4000);
    expect(elapsed).toBeLessThan(6000);
    expect(succeeded).toBe(100);
  });
});

5. 运维手册

5.1 配置调优指南

场景 1: 网格初始化 (burst 需求高)

throttle:
  inner_maker:
    burst: 20              # 允许快速布局
    refill_per_second: 5   # 稳态速率

场景 2: 对冲 (延迟敏感)

throttle:
  hedger:
    burst: 5
    refill_per_second: 10  # 高补充速率
    queue_timeout_ms: 1000 # 快速失败

场景 3: 增量更新 (稳定输出)

throttle:
  inner_maker:
    burst: 8
    refill_per_second: 3   # 匹配批次间隔
    batch_interval_ms: 333 # ≈ 1 / refill_per_second

5.2 故障排查

问题 1: Queue Depth 持续增长

可能原因:
1. refill_per_second 设置过低
2. 交易所实际限流比配置更严
3. 网络延迟导致积压

排查步骤:
1. 检查 send_rate_per_second 实际值
2. 调高 refill_per_second 或降低 burst
3. 检查 WebSocket 连接质量

问题 2: 频繁超时

可能原因:
1. queue_timeout_ms 设置过短
2. burst + refill 配置不合理
3. 队列深度不够

排查步骤:
1. 增加 queue_timeout_ms
2. 增加 max_queue_depth
3. 降低订单发送速率

5.3 紧急降级

# 临时禁用所有节流(紧急情况)
curl -X POST http://localhost:3000/api/throttle/disable

# 恢复默认配置
curl -X POST http://localhost:3000/api/throttle/reset

# 动态调整 refill rate
curl -X POST http://localhost:3000/api/throttle/config \
  -d '{"account_id": "inner_maker", "refill_per_second": 10}'

6. 后续优化方向

6.1 自适应速率调整

根据实际 RTT 和成功率动态调整参数:

class AdaptiveThrottle extends ThrottledOrderGateway {
  private recentLatencies: number[] = [];

  async sendOrderAsync(queued: QueuedOrder): Promise<void> {
    const start = Date.now();

    try {
      await super.sendOrderAsync(queued);
      const latency = Date.now() - start;

      this.recentLatencies.push(latency);
      this.adjustRateIfNeeded();
    } catch (error) {
      // 失败可能是限流信号
      this.decreaseRate();
      throw error;
    }
  }

  private adjustRateIfNeeded(): void {
    const p95 = percentile(this.recentLatencies, 0.95);

    if (p95 > 2000) {
      // 延迟过高 → 降速
      this.config.refillPerSecond *= 0.8;
    } else if (p95 < 500) {
      // 延迟正常 → 尝试加速
      this.config.refillPerSecond *= 1.1;
    }

    // 限制范围
    this.config.refillPerSecond = clamp(
      this.config.refillPerSecond,
      1,
      20
    );
  }
}

6.2 分布式限流

多实例场景下的全局协调:

class DistributedThrottle {
  constructor(
    private redis: Redis,
    private accountId: string,
    private config: ThrottleConfig
  ) {}

  async tryConsume(count: number): Promise<boolean> {
    const key = `throttle:${this.accountId}`;

    // Lua 脚本保证原子性
    const result = await this.redis.eval(`
      local key = KEYS[1]
      local capacity = tonumber(ARGV[1])
      local refill = tonumber(ARGV[2])
      local requested = tonumber(ARGV[3])
      local now = tonumber(ARGV[4])

      local bucket = redis.call('HGETALL', key)
      -- ... token bucket logic ...

      return tokens >= requested
    `, 1, key, this.config.burst, this.config.refillPerSecond, count, Date.now());

    return result === 1;
  }
}

7. 附录

7.1 代码路径

packages/execution/src/throttledOrderGateway.ts  # 核心实现
packages/execution/src/tokenBucket.ts            # 令牌桶
packages/telemetry/src/throttleMetrics.ts        # 监控指标
config/throttle.yaml                             # 配置示例

7.2 参考资料