PriceManager.js 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. import WebSocket from 'ws'
  2. import { EventEmitter } from 'events'
  3. import { logger } from '../../utils/logger.js'
  4. import { Config } from '../../config/simpleEnv.js'
  5. import { httpClient } from '../../utils/httpClient.js'
  6. /**
  7. * 全局价格管理器 - 通过WebSocket维护实时价格数据
  8. *
  9. * 功能:
  10. * 1. 自动连接到Pacifica WebSocket价格流
  11. * 2. 在内存中维护所有交易对的实时价格
  12. * 3. 提供同步价格查询接口
  13. * 4. 自动重连和错误恢复
  14. */
  15. export class PriceManager extends EventEmitter {
  16. constructor(wsUrl) {
  17. super()
  18. this.ws = null
  19. this.priceCache = new Map()
  20. this.reconnectAttempts = 0
  21. this.maxReconnectAttempts = 10
  22. this.reconnectInterval = 5000
  23. this.isConnecting = false
  24. this.pingInterval = null
  25. this.wsUrl = wsUrl || Config.pacifica.wsUrl || 'wss://ws.pacifica.fi/ws'
  26. // 启动连接
  27. this.connect()
  28. // 5分钟清理一次过期数据
  29. setInterval(() => this.cleanExpiredPrices(), 5 * 60 * 1000)
  30. // 30秒定期刷新价格数据(备用机制,主要依靠WebSocket实时更新)
  31. setInterval(() => this.refreshPrices(), 30 * 1000)
  32. }
  33. /**
  34. * 连接到WebSocket
  35. */
  36. async connect() {
  37. if (this.isConnecting || (this.ws && this.ws.readyState === WebSocket.OPEN)) {
  38. return
  39. }
  40. this.isConnecting = true
  41. try {
  42. logger.info('连接到Pacifica价格WebSocket', { url: this.wsUrl })
  43. this.ws = new WebSocket(this.wsUrl)
  44. this.ws.on('open', () => {
  45. logger.info('✅ Pacifica价格WebSocket连接成功')
  46. this.isConnecting = false
  47. this.reconnectAttempts = 0
  48. // 订阅价格数据流
  49. this.subscribeToPrices()
  50. // 启动心跳
  51. this.startPing()
  52. this.emit('connected')
  53. })
  54. this.ws.on('message', data => {
  55. try {
  56. const message = JSON.parse(data.toString())
  57. this.handleMessage(message)
  58. } catch (error) {
  59. logger.warn('解析WebSocket消息失败', { error, data: data.toString() })
  60. }
  61. })
  62. this.ws.on('error', error => {
  63. logger.error('Pacifica价格WebSocket错误', { error })
  64. this.emit('error', error)
  65. })
  66. this.ws.on('close', (code, reason) => {
  67. logger.warn('Pacifica价格WebSocket连接关闭', { code, reason: reason.toString() })
  68. this.isConnecting = false
  69. this.stopPing()
  70. // 自动重连
  71. if (this.reconnectAttempts < this.maxReconnectAttempts) {
  72. setTimeout(() => {
  73. this.reconnectAttempts++
  74. logger.info(`尝试重连Pacifica价格WebSocket (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
  75. this.connect()
  76. }, this.reconnectInterval)
  77. } else {
  78. logger.error('达到最大重连次数,停止重连')
  79. this.emit('maxReconnectsReached')
  80. }
  81. })
  82. } catch (error) {
  83. logger.error('创建WebSocket连接失败', { error })
  84. this.isConnecting = false
  85. this.emit('error', error)
  86. }
  87. }
  88. /**
  89. * 订阅价格数据流
  90. */
  91. subscribeToPrices() {
  92. if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
  93. logger.warn('WebSocket未连接,无法订阅价格数据')
  94. return
  95. }
  96. const subscribeMessage = {
  97. method: 'subscribe',
  98. params: {
  99. source: 'prices',
  100. },
  101. }
  102. try {
  103. this.ws.send(JSON.stringify(subscribeMessage))
  104. logger.info('📊 已订阅Pacifica价格数据流', subscribeMessage)
  105. } catch (error) {
  106. logger.error('发送价格订阅消息失败', { error })
  107. }
  108. }
  109. /**
  110. * 处理WebSocket消息
  111. */
  112. handleMessage(message) {
  113. try {
  114. if (message.channel === 'prices' && Array.isArray(message.data)) {
  115. const timestamp = Date.now()
  116. for (const priceItem of message.data) {
  117. if (priceItem.symbol) {
  118. const priceData = {
  119. symbol: priceItem.symbol,
  120. mark: priceItem.mark || priceItem.markPrice || '0',
  121. oracle: priceItem.oracle || priceItem.oraclePrice || '0',
  122. funding_rate: priceItem.funding_rate || priceItem.fundingRate,
  123. volume_24h: priceItem.volume_24h || priceItem.volume24h,
  124. timestamp: priceItem.timestamp || timestamp,
  125. lastUpdate: timestamp,
  126. }
  127. this.priceCache.set(priceItem.symbol, priceData)
  128. // 触发价格更新事件
  129. this.emit('priceUpdate', priceData)
  130. }
  131. }
  132. // 打印更新统计
  133. if (message.data.length > 0) {
  134. logger.debug(`📊 价格缓存更新: ${message.data.length}个交易对`)
  135. }
  136. }
  137. } catch (error) {
  138. logger.warn('处理价格消息失败', { error, message })
  139. }
  140. }
  141. /**
  142. * 启动心跳机制
  143. */
  144. startPing() {
  145. this.pingInterval = setInterval(() => {
  146. if (this.ws && this.ws.readyState === WebSocket.OPEN) {
  147. try {
  148. this.ws.ping()
  149. } catch (error) {
  150. logger.warn('发送心跳失败', { error })
  151. }
  152. }
  153. }, 30000) // 30秒心跳
  154. }
  155. /**
  156. * 停止心跳机制
  157. */
  158. stopPing() {
  159. if (this.pingInterval) {
  160. clearInterval(this.pingInterval)
  161. this.pingInterval = null
  162. }
  163. }
  164. /**
  165. * 获取指定symbol的当前价格(同步方法)
  166. */
  167. getPrice(symbol) {
  168. // 支持多种symbol格式
  169. const normalizedSymbol = this.normalizeSymbol(symbol)
  170. // 尝试精确匹配
  171. let priceData = this.priceCache.get(normalizedSymbol)
  172. if (!priceData) {
  173. // 尝试模糊匹配
  174. for (const [cachedSymbol, data] of this.priceCache.entries()) {
  175. if (cachedSymbol.includes(normalizedSymbol) || normalizedSymbol.includes(cachedSymbol)) {
  176. priceData = data
  177. break
  178. }
  179. }
  180. }
  181. if (priceData) {
  182. const price = parseFloat(priceData.mark || priceData.oracle || '0')
  183. if (price > 0) {
  184. return price
  185. }
  186. }
  187. logger.warn(`未找到${symbol}的价格数据`, {
  188. normalizedSymbol,
  189. availableSymbols: Array.from(this.priceCache.keys()).slice(0, 10),
  190. })
  191. return 0
  192. }
  193. /**
  194. * 标准化symbol名称
  195. */
  196. normalizeSymbol(symbol) {
  197. let normalized = symbol.toUpperCase()
  198. // 移除常见后缀
  199. normalized = normalized.replace(/[-_](USD|USDT|PERP)$/, '')
  200. return normalized
  201. }
  202. /**
  203. * 获取所有可用的价格数据
  204. */
  205. getAllPrices() {
  206. return new Map(this.priceCache)
  207. }
  208. /**
  209. * 获取价格缓存状态
  210. */
  211. getStatus() {
  212. const lastUpdate = Math.max(...Array.from(this.priceCache.values()).map(p => p.lastUpdate), 0) || null
  213. return {
  214. connected: this.ws?.readyState === WebSocket.OPEN,
  215. symbolCount: this.priceCache.size,
  216. lastUpdate,
  217. reconnectAttempts: this.reconnectAttempts,
  218. }
  219. }
  220. /**
  221. * 清理过期的价格数据
  222. */
  223. cleanExpiredPrices() {
  224. const now = Date.now()
  225. const maxAge = 10 * 60 * 1000 // 10分钟
  226. let removedCount = 0
  227. for (const [symbol, priceData] of this.priceCache.entries()) {
  228. if (now - priceData.lastUpdate > maxAge) {
  229. this.priceCache.delete(symbol)
  230. removedCount++
  231. }
  232. }
  233. if (removedCount > 0) {
  234. logger.info(`清理了${removedCount}个过期价格数据`)
  235. }
  236. }
  237. /**
  238. * 手动重连
  239. */
  240. reconnect() {
  241. if (this.ws) {
  242. this.ws.close()
  243. }
  244. this.reconnectAttempts = 0
  245. this.connect()
  246. }
  247. /**
  248. * 定期刷新价格数据(备用机制)
  249. */
  250. async refreshPrices() {
  251. try {
  252. // 如果WebSocket连接正常且数据新鲜,跳过刷新
  253. const status = this.getStatus()
  254. if (status.connected && status.lastUpdate && Date.now() - status.lastUpdate < 60000) {
  255. return
  256. }
  257. logger.debug('定期刷新价格数据 (WebSocket补充)')
  258. // 通过REST API获取价格数据作为补充
  259. const baseUrl = Config.pacifica.baseUrl || 'https://api.pacifica.fi'
  260. const response = await httpClient.get(`${baseUrl}/api/v1/info/prices`, {
  261. exchange: 'pacifica',
  262. timeout: 10000,
  263. retries: 1,
  264. })
  265. if (response.ok && response.data) {
  266. const timestamp = Date.now()
  267. let updateCount = 0
  268. // 处理价格数据
  269. const pricesData = Array.isArray(response.data) ? response.data : response.data.data || []
  270. for (const item of pricesData) {
  271. if (item.symbol && (item.mark || item.price)) {
  272. const priceData = {
  273. symbol: item.symbol,
  274. mark: item.mark || item.price || '0',
  275. oracle: item.oracle || item.mark || item.price || '0',
  276. funding_rate: item.funding_rate,
  277. volume_24h: item.volume_24h,
  278. timestamp: timestamp,
  279. lastUpdate: timestamp,
  280. }
  281. this.priceCache.set(item.symbol, priceData)
  282. updateCount++
  283. }
  284. }
  285. if (updateCount > 0) {
  286. logger.debug(`✅ REST价格刷新: ${updateCount}个交易对`)
  287. }
  288. }
  289. } catch (error) {
  290. logger.debug('定期价格刷新失败', { error })
  291. }
  292. }
  293. /**
  294. * 关闭连接
  295. */
  296. close() {
  297. this.stopPing()
  298. if (this.ws) {
  299. this.ws.close()
  300. this.ws = null
  301. }
  302. this.priceCache.clear()
  303. }
  304. }
  305. // 全局价格管理器实例
  306. export const globalPriceManager = new PriceManager()
  307. // 导出便利函数
  308. export function getGlobalPrice(symbol) {
  309. return globalPriceManager.getPrice(symbol)
  310. }