之前系统存在一个反复出现的问题:价格获取机制复杂且不稳定,需要多层fallback逻辑来处理API失败的情况。这导致:
使用Pacifica官方WebSocket API中的prices订阅,在内存中维护全局实时价格数据。
┌─────────────────────┐
│ Pacifica WebSocket │
│ wss://ws.pacifica.fi/ws
└─────────────────────┘
│
│ subscribe to "prices"
▼
┌─────────────────────┐
│ PriceManager │
│ (全局单例) │
│ ├─ WebSocket连接 │
│ ├─ 价格缓存Map │
│ ├─ 自动重连 │
│ └─ 清理过期数据 │
└─────────────────────┘
│
│ getPrice(symbol)
▼
┌─────────────────────┐
│ TradingEngine │
│ ├─ 使用率控制 │
│ ├─ 交易执行 │
│ └─ 价格查询 │
└─────────────────────┘
订阅请求:
{
"method": "subscribe",
"params": {
"source": "prices"
}
}
返回数据格式:
{
"channel": "prices",
"data": [
{
"symbol": "BTC",
"mark": "105473",
"oracle": "105473",
"funding_rate": "0.0001",
"volume_24h": "12345.67",
"timestamp": 1640995200000
}
]
}
import WebSocket from 'ws'
import { EventEmitter } from 'events'
export class PriceManager extends EventEmitter {
private ws: WebSocket | null = null
private priceCache: Map<string, PriceData> = new Map()
// 连接到WebSocket并订阅价格流
private async connect(): Promise<void> {
this.ws = new WebSocket(this.wsUrl)
this.ws.on('open', () => {
// 订阅价格数据
this.ws.send(
JSON.stringify({
method: 'subscribe',
params: { source: 'prices' },
}),
)
})
this.ws.on('message', data => {
const message = JSON.parse(data.toString())
if (message.channel === 'prices') {
// 更新价格缓存
for (const priceItem of message.data) {
this.priceCache.set(priceItem.symbol, priceItem)
}
}
})
}
// 同步获取价格
public getPrice(symbol: string): number {
const priceData = this.priceCache.get(this.normalizeSymbol(symbol))
return parseFloat(priceData?.mark || '0')
}
}
// 全局单例
export const globalPriceManager = new PriceManager()
import { globalPriceManager } from '../price/PriceManager.js'
export class TradingEngine {
private getCurrentPriceSync(symbol: string): number {
// 直接从全局价格管理器获取实时价格
const price = globalPriceManager.getPrice(symbol)
if (price > 0) {
return price
}
// 简单的fallback(仅在WebSocket未连接时使用)
if (symbol.includes('BTC')) {
return 65000 // BTC基准价格
}
return 0
}
}
在SystemOrchestrator中确保PriceManager在系统启动时自动初始化:
// src/modules/SystemOrchestrator.ts
import { globalPriceManager } from './price/PriceManager.js'
export class SystemOrchestrator {
async start(): Promise<void> {
// 价格管理器会自动启动WebSocket连接
logger.info('全局价格管理器已初始化')
// ... 其他服务启动
}
}
// 检查价格管理器状态
const status = globalPriceManager.getStatus()
console.log(status)
// {
// connected: true,
// symbolCount: 25,
// lastUpdate: 1640995200000,
// reconnectAttempts: 0
// }
// 监听价格更新事件
globalPriceManager.on('priceUpdate', priceData => {
console.log(`价格更新: ${priceData.symbol} = $${priceData.mark}`)
})
// 监听连接事件
globalPriceManager.on('connected', () => {
console.log('价格WebSocket已连接')
})
在生产环境中,将价格更新的日志级别设置为debug:
// 减少生产环境日志噪音
logger.debug(`价格缓存更新: ${message.data.length}个交易对`)
globalPriceManager.getStatus().connectedglobalPriceManager.reconnect()globalPriceManager.getStatus().lastUpdateglobalPriceManager.getAllPrices()查看可用symbol确保WebSocket网络连通性:
内存使用:
错误监控:
globalPriceManager的错误事件通过实现基于WebSocket的全局价格管理器,我们彻底解决了价格获取的稳定性问题,简化了系统架构,提高了交易执行的可靠性。
关键原则:
这个解决方案应该避免我们以后再次遇到类似的价格获取问题。