market-data-module.md 9.4 KB

Pacifica 市场数据模块

概述

Pacifica市场数据模块是一个基于WebSocket的实时数据流系统,用于获取和管理Pacifica DEX的价格和订单簿数据。该模块提供了高效的内存数据管理和自动重连机制。

核心功能

🔗 WebSocket连接管理

  • 自动连接和重连机制
  • 心跳保活机制
  • 连接状态监控
  • 错误处理和恢复

📊 实时数据流

  • 价格数据: 标记价格、预言机价格、资金费率、交易量等
  • 订单簿数据: 买卖盘深度、价差、市场深度
  • 数据聚合: 支持多种聚合级别 (1, 2, 5, 10, 100, 1000)

💾 内存数据管理

  • 高效的内存存储
  • 自动数据清理
  • 数据新鲜度检查
  • 内存使用监控

架构组件

1. WebSocketManager

负责WebSocket连接的生命周期管理。

import { WebSocketManager } from '../services/WebSocketManager';

const wsManager = new WebSocketManager({
  url: 'wss://ws.pacifica.fi/ws',
  reconnectInterval: 5000,
  heartbeatInterval: 30000,
  maxReconnectAttempts: 10
});

2. PriceDataService

处理价格数据的订阅、解析和存储。

import { PriceDataService } from '../services/PriceDataService';

const priceService = new PriceDataService(wsManager, {
  symbols: ['BTC', 'ETH', 'SOL'],
  updateInterval: 1000,
  maxDataAge: 10000
});

3. OrderBookService

处理订单簿数据的订阅、解析和存储。

import { OrderBookService } from '../services/OrderBookService';

const orderBookService = new OrderBookService(wsManager, {
  symbols: ['BTC', 'ETH', 'SOL'],
  aggregationLevels: [1, 2, 5, 10],
  updateInterval: 1000,
  maxDataAge: 10000
});

4. MarketDataManager

统一的市场数据管理器,整合所有数据服务。

import { MarketDataManager } from '../core/MarketDataManager';

const marketDataManager = new MarketDataManager({
  websocket: { /* WebSocket配置 */ },
  priceData: { /* 价格数据配置 */ },
  orderBook: { /* 订单簿配置 */ },
  cleanupInterval: 60000,
  maxDataAge: 300000
});

数据模型

PriceData

interface PriceData {
  symbol: string;           // 交易对符号
  mark: string;            // 标记价格
  oracle: string;          // 预言机价格
  mid: string;             // 中间价格
  funding: string;         // 资金费率
  next_funding: string;    // 下一个资金费率
  open_interest: string;   // 未平仓量
  volume_24h: string;      // 24小时交易量
  yesterday_price: string; // 昨日价格
  timestamp: number;       // 时间戳
}

OrderBookData

interface OrderBookData {
  symbol: string;
  bids: OrderBookLevel[];  // 买单
  asks: OrderBookLevel[];  // 卖单
  timestamp: number;
}

interface OrderBookLevel {
  a: string;  // 总数量
  n: number;  // 订单数量
  p: string;  // 价格
}

使用方法

基本使用

import { MarketDataManager } from '../core/MarketDataManager';

// 创建配置
const config = {
  websocket: {
    url: 'wss://ws.pacifica.fi/ws',
    reconnectInterval: 5000,
    heartbeatInterval: 30000,
    maxReconnectAttempts: 10
  },
  priceData: {
    symbols: ['BTC', 'ETH', 'SOL'],
    updateInterval: 1000,
    maxDataAge: 10000
  },
  orderBook: {
    symbols: ['BTC', 'ETH', 'SOL'],
    aggregationLevels: [1, 2, 5, 10],
    updateInterval: 1000,
    maxDataAge: 10000
  },
  cleanupInterval: 60000,
  maxDataAge: 300000
};

// 创建管理器
const marketDataManager = new MarketDataManager(config);

// 设置事件监听器
marketDataManager.on('priceUpdate', (data) => {
  console.log(`价格更新: ${data.symbol} = $${data.getMarkPrice()}`);
});

marketDataManager.on('orderBookUpdate', (data) => {
  const bestBid = data.getBestBid();
  const bestAsk = data.getBestAsk();
  console.log(`订单簿更新: ${data.symbol} 买价=${bestBid} 卖价=${bestAsk}`);
});

// 启动管理器
await marketDataManager.start();

数据访问

// 获取价格数据
const price = marketDataManager.getLatestPrice('BTC');
const oraclePrice = marketDataManager.getLatestOraclePrice('BTC');
const fundingRate = marketDataManager.getFundingRate('BTC');

// 获取订单簿数据
const bestBid = marketDataManager.getBestBid('BTC');
const bestAsk = marketDataManager.getBestAsk('BTC');
const spread = marketDataManager.getSpread('BTC');

// 获取市场深度
const marketDepth = marketDataManager.getMarketDepth('BTC', 10);

// 检查数据新鲜度
const isFresh = marketDataManager.isPriceDataFresh('BTC');
const dataAge = marketDataManager.getPriceDataAge('BTC');

状态监控

// 获取状态信息
const status = marketDataManager.getStatus();
const statistics = marketDataManager.getStatistics();
const healthCheck = marketDataManager.getHealthCheck();

console.log('连接状态:', status.websocket.isConnected);
console.log('数据记录数:', status.priceData.dataCount);
console.log('健康状态:', healthCheck.status);

配置选项

WebSocket配置

interface WebSocketConfig {
  url: string;                    // WebSocket URL
  reconnectInterval: number;      // 重连间隔 (ms)
  heartbeatInterval: number;      // 心跳间隔 (ms)
  maxReconnectAttempts: number;   // 最大重连次数
}

价格数据配置

interface PriceDataServiceConfig {
  symbols: string[];              // 订阅的交易对
  updateInterval?: number;        // 更新间隔 (ms)
  maxDataAge?: number;           // 最大数据年龄 (ms)
}

订单簿配置

interface OrderBookServiceConfig {
  symbols: string[];              // 订阅的交易对
  aggregationLevels: number[];    // 聚合级别
  updateInterval?: number;        // 更新间隔 (ms)
  maxDataAge?: number;           // 最大数据年龄 (ms)
}

事件系统

连接事件

  • connected: WebSocket连接建立
  • disconnected: WebSocket连接断开
  • error: 连接错误
  • maxReconnectAttemptsReached: 达到最大重连次数

数据事件

  • priceUpdate: 价格数据更新
  • priceChange: 价格显著变化
  • orderBookUpdate: 订单簿数据更新
  • spreadChange: 价差显著变化
  • batchPriceUpdate: 批量价格更新

服务事件

  • serviceError: 服务错误
  • started: 服务启动
  • stopped: 服务停止

测试和调试

运行测试

# 测试WebSocket连接
npm run test-market-data

# 运行完整示例
npm run market-data-demo

# 显示状态
npx tsx examples/market-data-example.ts status

# 测试连接
npx tsx examples/market-data-example.ts test

调试模式

# 启用详细日志
DEBUG=* npm run test-market-data

性能优化

内存管理

  • 自动清理过期数据
  • 可配置的数据保留时间
  • 内存使用监控

网络优化

  • 智能重连机制
  • 心跳保活
  • 连接池管理

数据处理

  • 高效的数据结构
  • 批量更新处理
  • 异步事件处理

错误处理

连接错误

  • 自动重连机制
  • 指数退避策略
  • 连接状态监控

数据错误

  • 数据格式验证
  • 异常数据过滤
  • 错误日志记录

服务错误

  • 服务隔离
  • 错误传播
  • 优雅降级

监控和告警

健康检查

const healthCheck = marketDataManager.getHealthCheck();
if (healthCheck.status !== 'healthy') {
  console.warn('系统健康状态异常:', healthCheck.issues);
}

性能指标

const statistics = marketDataManager.getStatistics();
console.log('数据新鲜度:', statistics.priceData.freshDataCount);
console.log('平均数据年龄:', statistics.priceData.averageAge);

最佳实践

1. 配置优化

  • 根据需求调整聚合级别
  • 合理设置数据保留时间
  • 优化重连参数

2. 事件处理

  • 使用事件监听器处理数据更新
  • 避免阻塞事件循环
  • 合理处理错误事件

3. 内存管理

  • 定期清理过期数据
  • 监控内存使用情况
  • 避免内存泄漏

4. 错误处理

  • 实现优雅的错误处理
  • 记录详细的错误日志
  • 提供降级方案

故障排除

常见问题

Q: WebSocket连接失败? A: 检查网络连接和URL配置,确保防火墙允许WebSocket连接。

Q: 数据更新不及时? A: 检查数据新鲜度设置,调整maxDataAge参数。

Q: 内存使用过高? A: 调整cleanupInterval和maxDataAge参数,增加数据清理频率。

Q: 重连失败? A: 检查maxReconnectAttempts设置,增加重连次数或调整重连间隔。

调试技巧

  1. 启用详细日志记录
  2. 监控连接状态和数据流
  3. 检查网络延迟和稳定性
  4. 验证数据格式和完整性

扩展功能

自定义数据处理

可以扩展数据处理逻辑,添加自定义的数据转换和分析功能。

多数据源支持

可以扩展支持多个数据源,实现数据聚合和冗余。

数据持久化

可以添加数据持久化功能,将重要数据保存到数据库。

实时分析

可以集成实时数据分析功能,提供更高级的市场洞察。

总结

Pacifica市场数据模块提供了一个完整、高效的实时数据流解决方案,支持:

  • ✅ 实时价格和订单簿数据
  • ✅ 自动重连和错误恢复
  • ✅ 高效的内存数据管理
  • ✅ 灵活的事件系统
  • ✅ 完善的监控和调试功能
  • ✅ 高性能和可扩展性

该模块为Pacifica多账户交易系统提供了可靠的数据基础,支持实时交易决策和风险控制。