| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- import WebSocket from 'ws'
- import { EventEmitter } from 'events'
- import { logger } from '../../utils/logger.js'
- import { Config } from '../../config/simpleEnv.js'
- import { httpClient } from '../../utils/httpClient.js'
- /**
- * 全局价格管理器 - 通过WebSocket维护实时价格数据
- *
- * 功能:
- * 1. 自动连接到Pacifica WebSocket价格流
- * 2. 在内存中维护所有交易对的实时价格
- * 3. 提供同步价格查询接口
- * 4. 自动重连和错误恢复
- */
- export class PriceManager extends EventEmitter {
- constructor(wsUrl) {
- super()
- this.ws = null
- this.priceCache = new Map()
- this.reconnectAttempts = 0
- this.maxReconnectAttempts = 10
- this.reconnectInterval = 5000
- this.isConnecting = false
- this.pingInterval = null
- this.wsUrl = wsUrl || Config.pacifica.wsUrl || 'wss://ws.pacifica.fi/ws'
- // 启动连接
- this.connect()
- // 5分钟清理一次过期数据
- setInterval(() => this.cleanExpiredPrices(), 5 * 60 * 1000)
- // 30秒定期刷新价格数据(备用机制,主要依靠WebSocket实时更新)
- setInterval(() => this.refreshPrices(), 30 * 1000)
- }
- /**
- * 连接到WebSocket
- */
- async connect() {
- if (this.isConnecting || (this.ws && this.ws.readyState === WebSocket.OPEN)) {
- return
- }
- this.isConnecting = true
- try {
- logger.info('连接到Pacifica价格WebSocket', { url: this.wsUrl })
- this.ws = new WebSocket(this.wsUrl)
- this.ws.on('open', () => {
- logger.info('✅ Pacifica价格WebSocket连接成功')
- this.isConnecting = false
- this.reconnectAttempts = 0
- // 订阅价格数据流
- this.subscribeToPrices()
- // 启动心跳
- this.startPing()
- this.emit('connected')
- })
- this.ws.on('message', data => {
- try {
- const message = JSON.parse(data.toString())
- this.handleMessage(message)
- } catch (error) {
- logger.warn('解析WebSocket消息失败', { error, data: data.toString() })
- }
- })
- this.ws.on('error', error => {
- logger.error('Pacifica价格WebSocket错误', { error })
- this.emit('error', error)
- })
- this.ws.on('close', (code, reason) => {
- logger.warn('Pacifica价格WebSocket连接关闭', { code, reason: reason.toString() })
- this.isConnecting = false
- this.stopPing()
- // 自动重连
- if (this.reconnectAttempts < this.maxReconnectAttempts) {
- setTimeout(() => {
- this.reconnectAttempts++
- logger.info(`尝试重连Pacifica价格WebSocket (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
- this.connect()
- }, this.reconnectInterval)
- } else {
- logger.error('达到最大重连次数,停止重连')
- this.emit('maxReconnectsReached')
- }
- })
- } catch (error) {
- logger.error('创建WebSocket连接失败', { error })
- this.isConnecting = false
- this.emit('error', error)
- }
- }
- /**
- * 订阅价格数据流
- */
- subscribeToPrices() {
- if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
- logger.warn('WebSocket未连接,无法订阅价格数据')
- return
- }
- const subscribeMessage = {
- method: 'subscribe',
- params: {
- source: 'prices',
- },
- }
- try {
- this.ws.send(JSON.stringify(subscribeMessage))
- logger.info('📊 已订阅Pacifica价格数据流', subscribeMessage)
- } catch (error) {
- logger.error('发送价格订阅消息失败', { error })
- }
- }
- /**
- * 处理WebSocket消息
- */
- handleMessage(message) {
- try {
- if (message.channel === 'prices' && Array.isArray(message.data)) {
- const timestamp = Date.now()
- for (const priceItem of message.data) {
- if (priceItem.symbol) {
- const priceData = {
- symbol: priceItem.symbol,
- mark: priceItem.mark || priceItem.markPrice || '0',
- oracle: priceItem.oracle || priceItem.oraclePrice || '0',
- funding_rate: priceItem.funding_rate || priceItem.fundingRate,
- volume_24h: priceItem.volume_24h || priceItem.volume24h,
- timestamp: priceItem.timestamp || timestamp,
- lastUpdate: timestamp,
- }
- this.priceCache.set(priceItem.symbol, priceData)
- // 触发价格更新事件
- this.emit('priceUpdate', priceData)
- }
- }
- // 打印更新统计
- if (message.data.length > 0) {
- logger.debug(`📊 价格缓存更新: ${message.data.length}个交易对`)
- }
- }
- } catch (error) {
- logger.warn('处理价格消息失败', { error, message })
- }
- }
- /**
- * 启动心跳机制
- */
- startPing() {
- this.pingInterval = setInterval(() => {
- if (this.ws && this.ws.readyState === WebSocket.OPEN) {
- try {
- this.ws.ping()
- } catch (error) {
- logger.warn('发送心跳失败', { error })
- }
- }
- }, 30000) // 30秒心跳
- }
- /**
- * 停止心跳机制
- */
- stopPing() {
- if (this.pingInterval) {
- clearInterval(this.pingInterval)
- this.pingInterval = null
- }
- }
- /**
- * 获取指定symbol的当前价格(同步方法)
- */
- getPrice(symbol) {
- // 支持多种symbol格式
- const normalizedSymbol = this.normalizeSymbol(symbol)
- // 尝试精确匹配
- let priceData = this.priceCache.get(normalizedSymbol)
- if (!priceData) {
- // 尝试模糊匹配
- for (const [cachedSymbol, data] of this.priceCache.entries()) {
- if (cachedSymbol.includes(normalizedSymbol) || normalizedSymbol.includes(cachedSymbol)) {
- priceData = data
- break
- }
- }
- }
- if (priceData) {
- const price = parseFloat(priceData.mark || priceData.oracle || '0')
- if (price > 0) {
- return price
- }
- }
- logger.warn(`未找到${symbol}的价格数据`, {
- normalizedSymbol,
- availableSymbols: Array.from(this.priceCache.keys()).slice(0, 10),
- })
- return 0
- }
- /**
- * 标准化symbol名称
- */
- normalizeSymbol(symbol) {
- let normalized = symbol.toUpperCase()
- // 移除常见后缀
- normalized = normalized.replace(/[-_](USD|USDT|PERP)$/, '')
- return normalized
- }
- /**
- * 获取所有可用的价格数据
- */
- getAllPrices() {
- return new Map(this.priceCache)
- }
- /**
- * 获取价格缓存状态
- */
- getStatus() {
- const lastUpdate = Math.max(...Array.from(this.priceCache.values()).map(p => p.lastUpdate), 0) || null
- return {
- connected: this.ws?.readyState === WebSocket.OPEN,
- symbolCount: this.priceCache.size,
- lastUpdate,
- reconnectAttempts: this.reconnectAttempts,
- }
- }
- /**
- * 清理过期的价格数据
- */
- cleanExpiredPrices() {
- const now = Date.now()
- const maxAge = 10 * 60 * 1000 // 10分钟
- let removedCount = 0
- for (const [symbol, priceData] of this.priceCache.entries()) {
- if (now - priceData.lastUpdate > maxAge) {
- this.priceCache.delete(symbol)
- removedCount++
- }
- }
- if (removedCount > 0) {
- logger.info(`清理了${removedCount}个过期价格数据`)
- }
- }
- /**
- * 手动重连
- */
- reconnect() {
- if (this.ws) {
- this.ws.close()
- }
- this.reconnectAttempts = 0
- this.connect()
- }
- /**
- * 定期刷新价格数据(备用机制)
- */
- async refreshPrices() {
- try {
- // 如果WebSocket连接正常且数据新鲜,跳过刷新
- const status = this.getStatus()
- if (status.connected && status.lastUpdate && Date.now() - status.lastUpdate < 60000) {
- return
- }
- logger.debug('定期刷新价格数据 (WebSocket补充)')
- // 通过REST API获取价格数据作为补充
- const baseUrl = Config.pacifica.baseUrl || 'https://api.pacifica.fi'
- const response = await httpClient.get(`${baseUrl}/api/v1/info/prices`, {
- exchange: 'pacifica',
- timeout: 10000,
- retries: 1,
- })
- if (response.ok && response.data) {
- const timestamp = Date.now()
- let updateCount = 0
- // 处理价格数据
- const pricesData = Array.isArray(response.data) ? response.data : response.data.data || []
- for (const item of pricesData) {
- if (item.symbol && (item.mark || item.price)) {
- const priceData = {
- symbol: item.symbol,
- mark: item.mark || item.price || '0',
- oracle: item.oracle || item.mark || item.price || '0',
- funding_rate: item.funding_rate,
- volume_24h: item.volume_24h,
- timestamp: timestamp,
- lastUpdate: timestamp,
- }
- this.priceCache.set(item.symbol, priceData)
- updateCount++
- }
- }
- if (updateCount > 0) {
- logger.debug(`✅ REST价格刷新: ${updateCount}个交易对`)
- }
- }
- } catch (error) {
- logger.debug('定期价格刷新失败', { error })
- }
- }
- /**
- * 关闭连接
- */
- close() {
- this.stopPing()
- if (this.ws) {
- this.ws.close()
- this.ws = null
- }
- this.priceCache.clear()
- }
- }
- // 全局价格管理器实例
- export const globalPriceManager = new PriceManager()
- // 导出便利函数
- export function getGlobalPrice(symbol) {
- return globalPriceManager.getPrice(symbol)
- }
|