import WebSocket from 'ws' import { EventEmitter } from 'events' /** * 行情数据类型 */ export interface MarketData { symbol: string price: number volume: number change: number changePercent: number high: number low: number open: number close: number timestamp: number bid: number ask: number bidSize: number askSize: number } /** * 24小时行情数据 */ export interface Ticker24hr { symbol: string priceChange: string priceChangePercent: string weightedAvgPrice: string prevClosePrice: string lastPrice: string lastQty: string bidPrice: string bidQty: string askPrice: string askQty: string openPrice: string highPrice: string lowPrice: string volume: string quoteVolume: string openTime: number closeTime: number firstId: number lastId: number count: number } /** * K线数据 */ export interface KlineData { symbol: string interval: string openTime: number open: number high: number low: number close: number volume: number closeTime: number quoteVolume: number trades: number takerBuyBaseVolume: number takerBuyQuoteVolume: number } /** * 深度数据 */ export interface DepthData { symbol: string bids: [string, string][] // [price, quantity] asks: [string, string][] // [price, quantity] lastUpdateId: number } /** * 行情数据管理器 * 通过 WebSocket 实时更新内存中的行情信息 */ export class MarketDataManager extends EventEmitter { private ws: WebSocket | null = null private reconnectTimer: NodeJS.Timeout | null = null private pingTimer: NodeJS.Timeout | null = null private pongTimer: NodeJS.Timeout | null = null // 内存中的行情数据 private marketDataMap: Map = new Map() private ticker24hrMap: Map = new Map() private klineDataMap: Map> = new Map() // symbol -> interval -> klines private depthDataMap: Map = new Map() // 订阅的符号列表 private subscribedSymbols: Set = new Set() private subscribedIntervals: Set = new Set() // 配置 private wsUrl: string private reconnectInterval: number private maxReconnectAttempts: number private reconnectAttempts: number = 0 private isConnected: boolean = false private isReconnecting: boolean = false constructor( wsUrl: string = 'wss://fstream.binance.com/ws', reconnectInterval: number = 5000, maxReconnectAttempts: number = 10, ) { super() this.wsUrl = wsUrl this.reconnectInterval = reconnectInterval this.maxReconnectAttempts = maxReconnectAttempts } /** * 连接 WebSocket */ public connect(): Promise { return new Promise((resolve, reject) => { if (this.isConnected || this.isReconnecting) { resolve() return } this.isReconnecting = true try { this.ws = new WebSocket(this.wsUrl) this.ws.on('open', () => { console.log('📡 WebSocket 连接成功') this.isConnected = true this.isReconnecting = false this.reconnectAttempts = 0 this.startPingTimer() this.resubscribeAll() this.emit('connected') resolve() }) this.ws.on('message', (data: WebSocket.Data) => { this.handleMessage(data) }) this.ws.on('close', (code: number, reason: Buffer) => { console.log(`📡 WebSocket 连接关闭: ${code} - ${reason.toString()}`) this.isConnected = false this.stopPingTimer() this.emit('disconnected', code, reason.toString()) this.scheduleReconnect() }) this.ws.on('error', (error: Error) => { console.error('📡 WebSocket 错误:', error) this.emit('error', error) reject(error) }) this.ws.on('pong', () => { this.handlePong() }) } catch (error) { this.isReconnecting = false reject(error) } }) } /** * 断开连接 */ public disconnect(): void { if (this.ws) { this.ws.close() this.ws = null } this.isConnected = false this.isReconnecting = false this.stopPingTimer() this.clearReconnectTimer() } /** * 订阅行情数据 * @param symbols 交易对列表 * @param intervals K线间隔列表(可选) */ public subscribeMarketData(symbols: string[], intervals: string[] = []): void { const streams: string[] = [] // 添加 24小时行情订阅 symbols.forEach(symbol => { const streamName = `${symbol.toLowerCase()}@ticker` streams.push(streamName) this.subscribedSymbols.add(symbol) }) // 添加 K线数据订阅 intervals.forEach(interval => { symbols.forEach(symbol => { const streamName = `${symbol.toLowerCase()}@kline_${interval}` streams.push(streamName) this.subscribedIntervals.add(interval) }) }) // 添加深度数据订阅 symbols.forEach(symbol => { const streamName = `${symbol.toLowerCase()}@depth20@100ms` streams.push(streamName) }) if (streams.length > 0) { this.subscribe(streams) } } /** * 取消订阅 * @param symbols 交易对列表 */ public unsubscribeMarketData(symbols: string[]): void { const streams: string[] = [] symbols.forEach(symbol => { const tickerStream = `${symbol.toLowerCase()}@ticker` const depthStream = `${symbol.toLowerCase()}@depth20@100ms` streams.push(tickerStream, depthStream) this.subscribedSymbols.delete(symbol) this.marketDataMap.delete(symbol) this.ticker24hrMap.delete(symbol) this.depthDataMap.delete(symbol) }) if (streams.length > 0) { this.unsubscribe(streams) } } /** * 获取行情数据 * @param symbol 交易对 */ public getMarketData(symbol: string): MarketData | null { return this.marketDataMap.get(symbol) || null } /** * 获取所有行情数据 */ public getAllMarketData(): Map { return new Map(this.marketDataMap) } /** * 获取24小时行情数据 * @param symbol 交易对 */ public getTicker24hr(symbol: string): Ticker24hr | null { return this.ticker24hrMap.get(symbol) || null } /** * 获取所有24小时行情数据 */ public getAllTicker24hr(): Map { return new Map(this.ticker24hrMap) } /** * 获取K线数据 * @param symbol 交易对 * @param interval 时间间隔 * @param limit 数量限制 */ public getKlineData(symbol: string, interval: string, limit: number = 100): KlineData[] { const symbolKlines = this.klineDataMap.get(symbol) if (!symbolKlines) return [] const klines = symbolKlines.get(interval) if (!klines) return [] return klines.slice(-limit) } /** * 获取深度数据 * @param symbol 交易对 */ public getDepthData(symbol: string): DepthData | null { return this.depthDataMap.get(symbol) || null } /** * 获取订阅的符号列表 */ public getSubscribedSymbols(): string[] { return Array.from(this.subscribedSymbols) } /** * 检查是否已连接 */ public isConnectedToWebSocket(): boolean { return this.isConnected } /** * 处理 WebSocket 消息 */ private handleMessage(data: WebSocket.Data): void { try { const message = JSON.parse(data.toString()) if (message.e === '24hrTicker') { this.handleTicker24hr(message) } else if (message.e === 'kline') { this.handleKlineData(message) } else if (message.e === 'depthUpdate') { this.handleDepthData(message) } else if (message.result === null && message.id) { // 订阅确认消息 console.log('✅ 订阅成功:', message) } else if (message.pong) { // Pong 响应 this.handlePong() } } catch (error) { console.error('❌ 解析 WebSocket 消息失败:', error) } } /** * 处理24小时行情数据 */ private handleTicker24hr(data: any): void { const symbol = data.s const tickerData: Ticker24hr = { symbol: data.s, priceChange: data.P, priceChangePercent: data.P, weightedAvgPrice: data.w, prevClosePrice: data.x, lastPrice: data.c, lastQty: data.Q, bidPrice: data.b, bidQty: data.B, askPrice: data.a, askQty: data.A, openPrice: data.o, highPrice: data.h, lowPrice: data.l, volume: data.v, quoteVolume: data.q, openTime: data.O, closeTime: data.C, firstId: data.F, lastId: data.L, count: data.n, } this.ticker24hrMap.set(symbol, tickerData) // 转换为 MarketData 格式 const marketData: MarketData = { symbol: symbol, price: parseFloat(data.c), volume: parseFloat(data.v), change: parseFloat(data.P), changePercent: parseFloat(data.P), high: parseFloat(data.h), low: parseFloat(data.l), open: parseFloat(data.o), close: parseFloat(data.c), timestamp: data.E, bid: parseFloat(data.b), ask: parseFloat(data.a), bidSize: parseFloat(data.B), askSize: parseFloat(data.A), } this.marketDataMap.set(symbol, marketData) this.emit('ticker24hr', tickerData) this.emit('marketData', marketData) } /** * 处理K线数据 */ private handleKlineData(data: any): void { const symbol = data.s const interval = data.k.i const kline = data.k const klineData: KlineData = { symbol: symbol, interval: interval, openTime: kline.t, open: parseFloat(kline.o), high: parseFloat(kline.h), low: parseFloat(kline.l), close: parseFloat(kline.c), volume: parseFloat(kline.v), closeTime: kline.T, quoteVolume: parseFloat(kline.q), trades: kline.n, takerBuyBaseVolume: parseFloat(kline.V), takerBuyQuoteVolume: parseFloat(kline.Q), } // 初始化数据结构 if (!this.klineDataMap.has(symbol)) { this.klineDataMap.set(symbol, new Map()) } const symbolKlines = this.klineDataMap.get(symbol)! if (!symbolKlines.has(interval)) { symbolKlines.set(interval, []) } const klines = symbolKlines.get(interval)! // 更新或添加K线数据 const existingIndex = klines.findIndex(k => k.openTime === klineData.openTime) if (existingIndex >= 0) { klines[existingIndex] = klineData } else { klines.push(klineData) // 保持最多1000条记录 if (klines.length > 1000) { klines.splice(0, klines.length - 1000) } } this.emit('kline', klineData) } /** * 处理深度数据 */ private handleDepthData(data: any): void { const symbol = data.s const depthData: DepthData = { symbol: symbol, bids: data.b, asks: data.a, lastUpdateId: data.u, } this.depthDataMap.set(symbol, depthData) this.emit('depth', depthData) } /** * 发送订阅请求 */ private subscribe(streams: string[]): void { if (!this.ws || !this.isConnected) { console.warn('⚠️ WebSocket 未连接,无法订阅') return } const message = { method: 'SUBSCRIBE', params: streams, id: Date.now(), } this.ws.send(JSON.stringify(message)) console.log('📡 订阅行情数据:', streams) } /** * 发送取消订阅请求 */ private unsubscribe(streams: string[]): void { if (!this.ws || !this.isConnected) { return } const message = { method: 'UNSUBSCRIBE', params: streams, id: Date.now(), } this.ws.send(JSON.stringify(message)) console.log('📡 取消订阅:', streams) } /** * 重新订阅所有数据 */ private resubscribeAll(): void { if (this.subscribedSymbols.size > 0) { const symbols = Array.from(this.subscribedSymbols) const intervals = Array.from(this.subscribedIntervals) this.subscribeMarketData(symbols, intervals) } } /** * 启动 Ping 定时器 */ private startPingTimer(): void { this.pingTimer = setInterval(() => { if (this.ws && this.isConnected) { this.ws.ping() this.startPongTimer() } }, 30000) // 每30秒发送一次 ping } /** * 停止 Ping 定时器 */ private stopPingTimer(): void { if (this.pingTimer) { clearInterval(this.pingTimer) this.pingTimer = null } } /** * 启动 Pong 超时定时器 */ private startPongTimer(): void { this.pongTimer = setTimeout(() => { console.warn('⚠️ Pong 超时,重新连接 WebSocket') this.disconnect() this.scheduleReconnect() }, 10000) // 10秒超时 } /** * 处理 Pong 响应 */ private handlePong(): void { if (this.pongTimer) { clearTimeout(this.pongTimer) this.pongTimer = null } } /** * 安排重连 */ private scheduleReconnect(): void { if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error('❌ 达到最大重连次数,停止重连') this.emit('maxReconnectAttemptsReached') return } this.clearReconnectTimer() this.reconnectTimer = setTimeout(() => { this.reconnectAttempts++ console.log(`🔄 尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`) this.connect().catch(error => { console.error('❌ 重连失败:', error) }) }, this.reconnectInterval) } /** * 清除重连定时器 */ private clearReconnectTimer(): void { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer) this.reconnectTimer = null } } }