marketDataManager.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. import WebSocket from 'ws'
  2. import { EventEmitter } from 'events'
  3. /**
  4. * 行情数据类型
  5. */
  6. export interface MarketData {
  7. symbol: string
  8. price: number
  9. volume: number
  10. change: number
  11. changePercent: number
  12. high: number
  13. low: number
  14. open: number
  15. close: number
  16. timestamp: number
  17. bid: number
  18. ask: number
  19. bidSize: number
  20. askSize: number
  21. }
  22. /**
  23. * 24小时行情数据
  24. */
  25. export interface Ticker24hr {
  26. symbol: string
  27. priceChange: string
  28. priceChangePercent: string
  29. weightedAvgPrice: string
  30. prevClosePrice: string
  31. lastPrice: string
  32. lastQty: string
  33. bidPrice: string
  34. bidQty: string
  35. askPrice: string
  36. askQty: string
  37. openPrice: string
  38. highPrice: string
  39. lowPrice: string
  40. volume: string
  41. quoteVolume: string
  42. openTime: number
  43. closeTime: number
  44. firstId: number
  45. lastId: number
  46. count: number
  47. }
  48. /**
  49. * K线数据
  50. */
  51. export interface KlineData {
  52. symbol: string
  53. interval: string
  54. openTime: number
  55. open: number
  56. high: number
  57. low: number
  58. close: number
  59. volume: number
  60. closeTime: number
  61. quoteVolume: number
  62. trades: number
  63. takerBuyBaseVolume: number
  64. takerBuyQuoteVolume: number
  65. }
  66. /**
  67. * 深度数据
  68. */
  69. export interface DepthData {
  70. symbol: string
  71. bids: [string, string][] // [price, quantity]
  72. asks: [string, string][] // [price, quantity]
  73. lastUpdateId: number
  74. }
  75. /**
  76. * 行情数据管理器
  77. * 通过 WebSocket 实时更新内存中的行情信息
  78. */
  79. export class MarketDataManager extends EventEmitter {
  80. private ws: WebSocket | null = null
  81. private reconnectTimer: NodeJS.Timeout | null = null
  82. private pingTimer: NodeJS.Timeout | null = null
  83. private pongTimer: NodeJS.Timeout | null = null
  84. // 内存中的行情数据
  85. private marketDataMap: Map<string, MarketData> = new Map()
  86. private ticker24hrMap: Map<string, Ticker24hr> = new Map()
  87. private klineDataMap: Map<string, Map<string, KlineData[]>> = new Map() // symbol -> interval -> klines
  88. private depthDataMap: Map<string, DepthData> = new Map()
  89. // 订阅的符号列表
  90. private subscribedSymbols: Set<string> = new Set()
  91. private subscribedIntervals: Set<string> = new Set()
  92. // 配置
  93. private wsUrl: string
  94. private reconnectInterval: number
  95. private maxReconnectAttempts: number
  96. private reconnectAttempts: number = 0
  97. private isConnected: boolean = false
  98. private isReconnecting: boolean = false
  99. constructor(
  100. wsUrl: string = 'wss://fstream.binance.com/ws',
  101. reconnectInterval: number = 5000,
  102. maxReconnectAttempts: number = 10,
  103. ) {
  104. super()
  105. this.wsUrl = wsUrl
  106. this.reconnectInterval = reconnectInterval
  107. this.maxReconnectAttempts = maxReconnectAttempts
  108. }
  109. /**
  110. * 连接 WebSocket
  111. */
  112. public connect(): Promise<void> {
  113. return new Promise((resolve, reject) => {
  114. if (this.isConnected || this.isReconnecting) {
  115. resolve()
  116. return
  117. }
  118. this.isReconnecting = true
  119. try {
  120. this.ws = new WebSocket(this.wsUrl)
  121. this.ws.on('open', () => {
  122. console.log('📡 WebSocket 连接成功')
  123. this.isConnected = true
  124. this.isReconnecting = false
  125. this.reconnectAttempts = 0
  126. this.startPingTimer()
  127. this.resubscribeAll()
  128. this.emit('connected')
  129. resolve()
  130. })
  131. this.ws.on('message', (data: WebSocket.Data) => {
  132. this.handleMessage(data)
  133. })
  134. this.ws.on('close', (code: number, reason: Buffer) => {
  135. console.log(`📡 WebSocket 连接关闭: ${code} - ${reason.toString()}`)
  136. this.isConnected = false
  137. this.stopPingTimer()
  138. this.emit('disconnected', code, reason.toString())
  139. this.scheduleReconnect()
  140. })
  141. this.ws.on('error', (error: Error) => {
  142. console.error('📡 WebSocket 错误:', error)
  143. this.emit('error', error)
  144. reject(error)
  145. })
  146. this.ws.on('pong', () => {
  147. this.handlePong()
  148. })
  149. } catch (error) {
  150. this.isReconnecting = false
  151. reject(error)
  152. }
  153. })
  154. }
  155. /**
  156. * 断开连接
  157. */
  158. public disconnect(): void {
  159. if (this.ws) {
  160. this.ws.close()
  161. this.ws = null
  162. }
  163. this.isConnected = false
  164. this.isReconnecting = false
  165. this.stopPingTimer()
  166. this.clearReconnectTimer()
  167. }
  168. /**
  169. * 订阅行情数据
  170. * @param symbols 交易对列表
  171. * @param intervals K线间隔列表(可选)
  172. */
  173. public subscribeMarketData(symbols: string[], intervals: string[] = []): void {
  174. const streams: string[] = []
  175. // 添加 24小时行情订阅
  176. symbols.forEach(symbol => {
  177. const streamName = `${symbol.toLowerCase()}@ticker`
  178. streams.push(streamName)
  179. this.subscribedSymbols.add(symbol)
  180. })
  181. // 添加 K线数据订阅
  182. intervals.forEach(interval => {
  183. symbols.forEach(symbol => {
  184. const streamName = `${symbol.toLowerCase()}@kline_${interval}`
  185. streams.push(streamName)
  186. this.subscribedIntervals.add(interval)
  187. })
  188. })
  189. // 添加深度数据订阅
  190. symbols.forEach(symbol => {
  191. const streamName = `${symbol.toLowerCase()}@depth20@100ms`
  192. streams.push(streamName)
  193. })
  194. if (streams.length > 0) {
  195. this.subscribe(streams)
  196. }
  197. }
  198. /**
  199. * 取消订阅
  200. * @param symbols 交易对列表
  201. */
  202. public unsubscribeMarketData(symbols: string[]): void {
  203. const streams: string[] = []
  204. symbols.forEach(symbol => {
  205. const tickerStream = `${symbol.toLowerCase()}@ticker`
  206. const depthStream = `${symbol.toLowerCase()}@depth20@100ms`
  207. streams.push(tickerStream, depthStream)
  208. this.subscribedSymbols.delete(symbol)
  209. this.marketDataMap.delete(symbol)
  210. this.ticker24hrMap.delete(symbol)
  211. this.depthDataMap.delete(symbol)
  212. })
  213. if (streams.length > 0) {
  214. this.unsubscribe(streams)
  215. }
  216. }
  217. /**
  218. * 获取行情数据
  219. * @param symbol 交易对
  220. */
  221. public getMarketData(symbol: string): MarketData | null {
  222. return this.marketDataMap.get(symbol) || null
  223. }
  224. /**
  225. * 获取所有行情数据
  226. */
  227. public getAllMarketData(): Map<string, MarketData> {
  228. return new Map(this.marketDataMap)
  229. }
  230. /**
  231. * 获取24小时行情数据
  232. * @param symbol 交易对
  233. */
  234. public getTicker24hr(symbol: string): Ticker24hr | null {
  235. return this.ticker24hrMap.get(symbol) || null
  236. }
  237. /**
  238. * 获取所有24小时行情数据
  239. */
  240. public getAllTicker24hr(): Map<string, Ticker24hr> {
  241. return new Map(this.ticker24hrMap)
  242. }
  243. /**
  244. * 获取K线数据
  245. * @param symbol 交易对
  246. * @param interval 时间间隔
  247. * @param limit 数量限制
  248. */
  249. public getKlineData(symbol: string, interval: string, limit: number = 100): KlineData[] {
  250. const symbolKlines = this.klineDataMap.get(symbol)
  251. if (!symbolKlines) return []
  252. const klines = symbolKlines.get(interval)
  253. if (!klines) return []
  254. return klines.slice(-limit)
  255. }
  256. /**
  257. * 获取深度数据
  258. * @param symbol 交易对
  259. */
  260. public getDepthData(symbol: string): DepthData | null {
  261. return this.depthDataMap.get(symbol) || null
  262. }
  263. /**
  264. * 获取订阅的符号列表
  265. */
  266. public getSubscribedSymbols(): string[] {
  267. return Array.from(this.subscribedSymbols)
  268. }
  269. /**
  270. * 检查是否已连接
  271. */
  272. public isConnectedToWebSocket(): boolean {
  273. return this.isConnected
  274. }
  275. /**
  276. * 处理 WebSocket 消息
  277. */
  278. private handleMessage(data: WebSocket.Data): void {
  279. try {
  280. const message = JSON.parse(data.toString())
  281. if (message.e === '24hrTicker') {
  282. this.handleTicker24hr(message)
  283. } else if (message.e === 'kline') {
  284. this.handleKlineData(message)
  285. } else if (message.e === 'depthUpdate') {
  286. this.handleDepthData(message)
  287. } else if (message.result === null && message.id) {
  288. // 订阅确认消息
  289. console.log('✅ 订阅成功:', message)
  290. } else if (message.pong) {
  291. // Pong 响应
  292. this.handlePong()
  293. }
  294. } catch (error) {
  295. console.error('❌ 解析 WebSocket 消息失败:', error)
  296. }
  297. }
  298. /**
  299. * 处理24小时行情数据
  300. */
  301. private handleTicker24hr(data: any): void {
  302. const symbol = data.s
  303. const tickerData: Ticker24hr = {
  304. symbol: data.s,
  305. priceChange: data.P,
  306. priceChangePercent: data.P,
  307. weightedAvgPrice: data.w,
  308. prevClosePrice: data.x,
  309. lastPrice: data.c,
  310. lastQty: data.Q,
  311. bidPrice: data.b,
  312. bidQty: data.B,
  313. askPrice: data.a,
  314. askQty: data.A,
  315. openPrice: data.o,
  316. highPrice: data.h,
  317. lowPrice: data.l,
  318. volume: data.v,
  319. quoteVolume: data.q,
  320. openTime: data.O,
  321. closeTime: data.C,
  322. firstId: data.F,
  323. lastId: data.L,
  324. count: data.n,
  325. }
  326. this.ticker24hrMap.set(symbol, tickerData)
  327. // 转换为 MarketData 格式
  328. const marketData: MarketData = {
  329. symbol: symbol,
  330. price: parseFloat(data.c),
  331. volume: parseFloat(data.v),
  332. change: parseFloat(data.P),
  333. changePercent: parseFloat(data.P),
  334. high: parseFloat(data.h),
  335. low: parseFloat(data.l),
  336. open: parseFloat(data.o),
  337. close: parseFloat(data.c),
  338. timestamp: data.E,
  339. bid: parseFloat(data.b),
  340. ask: parseFloat(data.a),
  341. bidSize: parseFloat(data.B),
  342. askSize: parseFloat(data.A),
  343. }
  344. this.marketDataMap.set(symbol, marketData)
  345. this.emit('ticker24hr', tickerData)
  346. this.emit('marketData', marketData)
  347. }
  348. /**
  349. * 处理K线数据
  350. */
  351. private handleKlineData(data: any): void {
  352. const symbol = data.s
  353. const interval = data.k.i
  354. const kline = data.k
  355. const klineData: KlineData = {
  356. symbol: symbol,
  357. interval: interval,
  358. openTime: kline.t,
  359. open: parseFloat(kline.o),
  360. high: parseFloat(kline.h),
  361. low: parseFloat(kline.l),
  362. close: parseFloat(kline.c),
  363. volume: parseFloat(kline.v),
  364. closeTime: kline.T,
  365. quoteVolume: parseFloat(kline.q),
  366. trades: kline.n,
  367. takerBuyBaseVolume: parseFloat(kline.V),
  368. takerBuyQuoteVolume: parseFloat(kline.Q),
  369. }
  370. // 初始化数据结构
  371. if (!this.klineDataMap.has(symbol)) {
  372. this.klineDataMap.set(symbol, new Map())
  373. }
  374. const symbolKlines = this.klineDataMap.get(symbol)!
  375. if (!symbolKlines.has(interval)) {
  376. symbolKlines.set(interval, [])
  377. }
  378. const klines = symbolKlines.get(interval)!
  379. // 更新或添加K线数据
  380. const existingIndex = klines.findIndex(k => k.openTime === klineData.openTime)
  381. if (existingIndex >= 0) {
  382. klines[existingIndex] = klineData
  383. } else {
  384. klines.push(klineData)
  385. // 保持最多1000条记录
  386. if (klines.length > 1000) {
  387. klines.splice(0, klines.length - 1000)
  388. }
  389. }
  390. this.emit('kline', klineData)
  391. }
  392. /**
  393. * 处理深度数据
  394. */
  395. private handleDepthData(data: any): void {
  396. const symbol = data.s
  397. const depthData: DepthData = {
  398. symbol: symbol,
  399. bids: data.b,
  400. asks: data.a,
  401. lastUpdateId: data.u,
  402. }
  403. this.depthDataMap.set(symbol, depthData)
  404. this.emit('depth', depthData)
  405. }
  406. /**
  407. * 发送订阅请求
  408. */
  409. private subscribe(streams: string[]): void {
  410. if (!this.ws || !this.isConnected) {
  411. console.warn('⚠️ WebSocket 未连接,无法订阅')
  412. return
  413. }
  414. const message = {
  415. method: 'SUBSCRIBE',
  416. params: streams,
  417. id: Date.now(),
  418. }
  419. this.ws.send(JSON.stringify(message))
  420. console.log('📡 订阅行情数据:', streams)
  421. }
  422. /**
  423. * 发送取消订阅请求
  424. */
  425. private unsubscribe(streams: string[]): void {
  426. if (!this.ws || !this.isConnected) {
  427. return
  428. }
  429. const message = {
  430. method: 'UNSUBSCRIBE',
  431. params: streams,
  432. id: Date.now(),
  433. }
  434. this.ws.send(JSON.stringify(message))
  435. console.log('📡 取消订阅:', streams)
  436. }
  437. /**
  438. * 重新订阅所有数据
  439. */
  440. private resubscribeAll(): void {
  441. if (this.subscribedSymbols.size > 0) {
  442. const symbols = Array.from(this.subscribedSymbols)
  443. const intervals = Array.from(this.subscribedIntervals)
  444. this.subscribeMarketData(symbols, intervals)
  445. }
  446. }
  447. /**
  448. * 启动 Ping 定时器
  449. */
  450. private startPingTimer(): void {
  451. this.pingTimer = setInterval(() => {
  452. if (this.ws && this.isConnected) {
  453. this.ws.ping()
  454. this.startPongTimer()
  455. }
  456. }, 30000) // 每30秒发送一次 ping
  457. }
  458. /**
  459. * 停止 Ping 定时器
  460. */
  461. private stopPingTimer(): void {
  462. if (this.pingTimer) {
  463. clearInterval(this.pingTimer)
  464. this.pingTimer = null
  465. }
  466. }
  467. /**
  468. * 启动 Pong 超时定时器
  469. */
  470. private startPongTimer(): void {
  471. this.pongTimer = setTimeout(() => {
  472. console.warn('⚠️ Pong 超时,重新连接 WebSocket')
  473. this.disconnect()
  474. this.scheduleReconnect()
  475. }, 10000) // 10秒超时
  476. }
  477. /**
  478. * 处理 Pong 响应
  479. */
  480. private handlePong(): void {
  481. if (this.pongTimer) {
  482. clearTimeout(this.pongTimer)
  483. this.pongTimer = null
  484. }
  485. }
  486. /**
  487. * 安排重连
  488. */
  489. private scheduleReconnect(): void {
  490. if (this.reconnectAttempts >= this.maxReconnectAttempts) {
  491. console.error('❌ 达到最大重连次数,停止重连')
  492. this.emit('maxReconnectAttemptsReached')
  493. return
  494. }
  495. this.clearReconnectTimer()
  496. this.reconnectTimer = setTimeout(() => {
  497. this.reconnectAttempts++
  498. console.log(`🔄 尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
  499. this.connect().catch(error => {
  500. console.error('❌ 重连失败:', error)
  501. })
  502. }, this.reconnectInterval)
  503. }
  504. /**
  505. * 清除重连定时器
  506. */
  507. private clearReconnectTimer(): void {
  508. if (this.reconnectTimer) {
  509. clearTimeout(this.reconnectTimer)
  510. this.reconnectTimer = null
  511. }
  512. }
  513. }