import WebSocket from 'ws' import { EventEmitter } from 'events' import { logger } from '../../utils/logger.js' import { Config } from '../../config/simpleEnv.js' import { httpClient } from '../../utils/httpClient.js' /** * 全局价格管理器 - 通过WebSocket维护实时价格数据 * * 功能: * 1. 自动连接到Pacifica WebSocket价格流 * 2. 在内存中维护所有交易对的实时价格 * 3. 提供同步价格查询接口 * 4. 自动重连和错误恢复 */ export class PriceManager extends EventEmitter { constructor(wsUrl) { super() this.ws = null this.priceCache = new Map() this.reconnectAttempts = 0 this.maxReconnectAttempts = 10 this.reconnectInterval = 5000 this.isConnecting = false this.pingInterval = null this.wsUrl = wsUrl || Config.pacifica.wsUrl || 'wss://ws.pacifica.fi/ws' // 启动连接 this.connect() // 5分钟清理一次过期数据 setInterval(() => this.cleanExpiredPrices(), 5 * 60 * 1000) // 30秒定期刷新价格数据(备用机制,主要依靠WebSocket实时更新) setInterval(() => this.refreshPrices(), 30 * 1000) } /** * 连接到WebSocket */ async connect() { if (this.isConnecting || (this.ws && this.ws.readyState === WebSocket.OPEN)) { return } this.isConnecting = true try { logger.info('连接到Pacifica价格WebSocket', { url: this.wsUrl }) this.ws = new WebSocket(this.wsUrl) this.ws.on('open', () => { logger.info('✅ Pacifica价格WebSocket连接成功') this.isConnecting = false this.reconnectAttempts = 0 // 订阅价格数据流 this.subscribeToPrices() // 启动心跳 this.startPing() this.emit('connected') }) this.ws.on('message', data => { try { const message = JSON.parse(data.toString()) this.handleMessage(message) } catch (error) { logger.warn('解析WebSocket消息失败', { error, data: data.toString() }) } }) this.ws.on('error', error => { logger.error('Pacifica价格WebSocket错误', { error }) this.emit('error', error) }) this.ws.on('close', (code, reason) => { logger.warn('Pacifica价格WebSocket连接关闭', { code, reason: reason.toString() }) this.isConnecting = false this.stopPing() // 自动重连 if (this.reconnectAttempts < this.maxReconnectAttempts) { setTimeout(() => { this.reconnectAttempts++ logger.info(`尝试重连Pacifica价格WebSocket (${this.reconnectAttempts}/${this.maxReconnectAttempts})`) this.connect() }, this.reconnectInterval) } else { logger.error('达到最大重连次数,停止重连') this.emit('maxReconnectsReached') } }) } catch (error) { logger.error('创建WebSocket连接失败', { error }) this.isConnecting = false this.emit('error', error) } } /** * 订阅价格数据流 */ subscribeToPrices() { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { logger.warn('WebSocket未连接,无法订阅价格数据') return } const subscribeMessage = { method: 'subscribe', params: { source: 'prices', }, } try { this.ws.send(JSON.stringify(subscribeMessage)) logger.info('📊 已订阅Pacifica价格数据流', subscribeMessage) } catch (error) { logger.error('发送价格订阅消息失败', { error }) } } /** * 处理WebSocket消息 */ handleMessage(message) { try { if (message.channel === 'prices' && Array.isArray(message.data)) { const timestamp = Date.now() for (const priceItem of message.data) { if (priceItem.symbol) { const priceData = { symbol: priceItem.symbol, mark: priceItem.mark || priceItem.markPrice || '0', oracle: priceItem.oracle || priceItem.oraclePrice || '0', funding_rate: priceItem.funding_rate || priceItem.fundingRate, volume_24h: priceItem.volume_24h || priceItem.volume24h, timestamp: priceItem.timestamp || timestamp, lastUpdate: timestamp, } this.priceCache.set(priceItem.symbol, priceData) // 触发价格更新事件 this.emit('priceUpdate', priceData) } } // 打印更新统计 if (message.data.length > 0) { logger.debug(`📊 价格缓存更新: ${message.data.length}个交易对`) } } } catch (error) { logger.warn('处理价格消息失败', { error, message }) } } /** * 启动心跳机制 */ startPing() { this.pingInterval = setInterval(() => { if (this.ws && this.ws.readyState === WebSocket.OPEN) { try { this.ws.ping() } catch (error) { logger.warn('发送心跳失败', { error }) } } }, 30000) // 30秒心跳 } /** * 停止心跳机制 */ stopPing() { if (this.pingInterval) { clearInterval(this.pingInterval) this.pingInterval = null } } /** * 获取指定symbol的当前价格(同步方法) */ getPrice(symbol) { // 支持多种symbol格式 const normalizedSymbol = this.normalizeSymbol(symbol) // 尝试精确匹配 let priceData = this.priceCache.get(normalizedSymbol) if (!priceData) { // 尝试模糊匹配 for (const [cachedSymbol, data] of this.priceCache.entries()) { if (cachedSymbol.includes(normalizedSymbol) || normalizedSymbol.includes(cachedSymbol)) { priceData = data break } } } if (priceData) { const price = parseFloat(priceData.mark || priceData.oracle || '0') if (price > 0) { return price } } logger.warn(`未找到${symbol}的价格数据`, { normalizedSymbol, availableSymbols: Array.from(this.priceCache.keys()).slice(0, 10), }) return 0 } /** * 标准化symbol名称 */ normalizeSymbol(symbol) { let normalized = symbol.toUpperCase() // 移除常见后缀 normalized = normalized.replace(/[-_](USD|USDT|PERP)$/, '') return normalized } /** * 获取所有可用的价格数据 */ getAllPrices() { return new Map(this.priceCache) } /** * 获取价格缓存状态 */ getStatus() { const lastUpdate = Math.max(...Array.from(this.priceCache.values()).map(p => p.lastUpdate), 0) || null return { connected: this.ws?.readyState === WebSocket.OPEN, symbolCount: this.priceCache.size, lastUpdate, reconnectAttempts: this.reconnectAttempts, } } /** * 清理过期的价格数据 */ cleanExpiredPrices() { const now = Date.now() const maxAge = 10 * 60 * 1000 // 10分钟 let removedCount = 0 for (const [symbol, priceData] of this.priceCache.entries()) { if (now - priceData.lastUpdate > maxAge) { this.priceCache.delete(symbol) removedCount++ } } if (removedCount > 0) { logger.info(`清理了${removedCount}个过期价格数据`) } } /** * 手动重连 */ reconnect() { if (this.ws) { this.ws.close() } this.reconnectAttempts = 0 this.connect() } /** * 定期刷新价格数据(备用机制) */ async refreshPrices() { try { // 如果WebSocket连接正常且数据新鲜,跳过刷新 const status = this.getStatus() if (status.connected && status.lastUpdate && Date.now() - status.lastUpdate < 60000) { return } logger.debug('定期刷新价格数据 (WebSocket补充)') // 通过REST API获取价格数据作为补充 const baseUrl = Config.pacifica.baseUrl || 'https://api.pacifica.fi' const response = await httpClient.get(`${baseUrl}/api/v1/info/prices`, { exchange: 'pacifica', timeout: 10000, retries: 1, }) if (response.ok && response.data) { const timestamp = Date.now() let updateCount = 0 // 处理价格数据 const pricesData = Array.isArray(response.data) ? response.data : response.data.data || [] for (const item of pricesData) { if (item.symbol && (item.mark || item.price)) { const priceData = { symbol: item.symbol, mark: item.mark || item.price || '0', oracle: item.oracle || item.mark || item.price || '0', funding_rate: item.funding_rate, volume_24h: item.volume_24h, timestamp: timestamp, lastUpdate: timestamp, } this.priceCache.set(item.symbol, priceData) updateCount++ } } if (updateCount > 0) { logger.debug(`✅ REST价格刷新: ${updateCount}个交易对`) } } } catch (error) { logger.debug('定期价格刷新失败', { error }) } } /** * 关闭连接 */ close() { this.stopPing() if (this.ws) { this.ws.close() this.ws = null } this.priceCache.clear() } } // 全局价格管理器实例 export const globalPriceManager = new PriceManager() // 导出便利函数 export function getGlobalPrice(symbol) { return globalPriceManager.getPrice(symbol) }