| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- import WebSocket from 'ws'
- import { EventEmitter } from 'events'
- /**
- * 行情数据类型
- */
- export interface MarketData {
- symbol: string
- price: number
- volume: number
- change: number
- changePercent: number
- high: number
- low: number
- open: number
- close: number
- timestamp: number
- bid: number
- ask: number
- bidSize: number
- askSize: number
- }
- /**
- * 24小时行情数据
- */
- export interface Ticker24hr {
- symbol: string
- priceChange: string
- priceChangePercent: string
- weightedAvgPrice: string
- prevClosePrice: string
- lastPrice: string
- lastQty: string
- bidPrice: string
- bidQty: string
- askPrice: string
- askQty: string
- openPrice: string
- highPrice: string
- lowPrice: string
- volume: string
- quoteVolume: string
- openTime: number
- closeTime: number
- firstId: number
- lastId: number
- count: number
- }
- /**
- * K线数据
- */
- export interface KlineData {
- symbol: string
- interval: string
- openTime: number
- open: number
- high: number
- low: number
- close: number
- volume: number
- closeTime: number
- quoteVolume: number
- trades: number
- takerBuyBaseVolume: number
- takerBuyQuoteVolume: number
- }
- /**
- * 深度数据
- */
- export interface DepthData {
- symbol: string
- bids: [string, string][] // [price, quantity]
- asks: [string, string][] // [price, quantity]
- lastUpdateId: number
- }
- /**
- * 行情数据管理器
- * 通过 WebSocket 实时更新内存中的行情信息
- */
- export class MarketDataManager extends EventEmitter {
- private ws: WebSocket | null = null
- private reconnectTimer: NodeJS.Timeout | null = null
- private pingTimer: NodeJS.Timeout | null = null
- private pongTimer: NodeJS.Timeout | null = null
- // 内存中的行情数据
- private marketDataMap: Map<string, MarketData> = new Map()
- private ticker24hrMap: Map<string, Ticker24hr> = new Map()
- private klineDataMap: Map<string, Map<string, KlineData[]>> = new Map() // symbol -> interval -> klines
- private depthDataMap: Map<string, DepthData> = new Map()
- // 订阅的符号列表
- private subscribedSymbols: Set<string> = new Set()
- private subscribedIntervals: Set<string> = new Set()
- // 配置
- private wsUrl: string
- private reconnectInterval: number
- private maxReconnectAttempts: number
- private reconnectAttempts: number = 0
- private isConnected: boolean = false
- private isReconnecting: boolean = false
- constructor(
- wsUrl: string = 'wss://fstream.binance.com/ws',
- reconnectInterval: number = 5000,
- maxReconnectAttempts: number = 10,
- ) {
- super()
- this.wsUrl = wsUrl
- this.reconnectInterval = reconnectInterval
- this.maxReconnectAttempts = maxReconnectAttempts
- }
- /**
- * 连接 WebSocket
- */
- public connect(): Promise<void> {
- return new Promise((resolve, reject) => {
- if (this.isConnected || this.isReconnecting) {
- resolve()
- return
- }
- this.isReconnecting = true
- try {
- this.ws = new WebSocket(this.wsUrl)
- this.ws.on('open', () => {
- console.log('📡 WebSocket 连接成功')
- this.isConnected = true
- this.isReconnecting = false
- this.reconnectAttempts = 0
- this.startPingTimer()
- this.resubscribeAll()
- this.emit('connected')
- resolve()
- })
- this.ws.on('message', (data: WebSocket.Data) => {
- this.handleMessage(data)
- })
- this.ws.on('close', (code: number, reason: Buffer) => {
- console.log(`📡 WebSocket 连接关闭: ${code} - ${reason.toString()}`)
- this.isConnected = false
- this.stopPingTimer()
- this.emit('disconnected', code, reason.toString())
- this.scheduleReconnect()
- })
- this.ws.on('error', (error: Error) => {
- console.error('📡 WebSocket 错误:', error)
- this.emit('error', error)
- reject(error)
- })
- this.ws.on('pong', () => {
- this.handlePong()
- })
- } catch (error) {
- this.isReconnecting = false
- reject(error)
- }
- })
- }
- /**
- * 断开连接
- */
- public disconnect(): void {
- if (this.ws) {
- this.ws.close()
- this.ws = null
- }
- this.isConnected = false
- this.isReconnecting = false
- this.stopPingTimer()
- this.clearReconnectTimer()
- }
- /**
- * 订阅行情数据
- * @param symbols 交易对列表
- * @param intervals K线间隔列表(可选)
- */
- public subscribeMarketData(symbols: string[], intervals: string[] = []): void {
- const streams: string[] = []
- // 添加 24小时行情订阅
- symbols.forEach(symbol => {
- const streamName = `${symbol.toLowerCase()}@ticker`
- streams.push(streamName)
- this.subscribedSymbols.add(symbol)
- })
- // 添加 K线数据订阅
- intervals.forEach(interval => {
- symbols.forEach(symbol => {
- const streamName = `${symbol.toLowerCase()}@kline_${interval}`
- streams.push(streamName)
- this.subscribedIntervals.add(interval)
- })
- })
- // 添加深度数据订阅
- symbols.forEach(symbol => {
- const streamName = `${symbol.toLowerCase()}@depth20@100ms`
- streams.push(streamName)
- })
- if (streams.length > 0) {
- this.subscribe(streams)
- }
- }
- /**
- * 取消订阅
- * @param symbols 交易对列表
- */
- public unsubscribeMarketData(symbols: string[]): void {
- const streams: string[] = []
- symbols.forEach(symbol => {
- const tickerStream = `${symbol.toLowerCase()}@ticker`
- const depthStream = `${symbol.toLowerCase()}@depth20@100ms`
- streams.push(tickerStream, depthStream)
- this.subscribedSymbols.delete(symbol)
- this.marketDataMap.delete(symbol)
- this.ticker24hrMap.delete(symbol)
- this.depthDataMap.delete(symbol)
- })
- if (streams.length > 0) {
- this.unsubscribe(streams)
- }
- }
- /**
- * 获取行情数据
- * @param symbol 交易对
- */
- public getMarketData(symbol: string): MarketData | null {
- return this.marketDataMap.get(symbol) || null
- }
- /**
- * 获取所有行情数据
- */
- public getAllMarketData(): Map<string, MarketData> {
- return new Map(this.marketDataMap)
- }
- /**
- * 获取24小时行情数据
- * @param symbol 交易对
- */
- public getTicker24hr(symbol: string): Ticker24hr | null {
- return this.ticker24hrMap.get(symbol) || null
- }
- /**
- * 获取所有24小时行情数据
- */
- public getAllTicker24hr(): Map<string, Ticker24hr> {
- return new Map(this.ticker24hrMap)
- }
- /**
- * 获取K线数据
- * @param symbol 交易对
- * @param interval 时间间隔
- * @param limit 数量限制
- */
- public getKlineData(symbol: string, interval: string, limit: number = 100): KlineData[] {
- const symbolKlines = this.klineDataMap.get(symbol)
- if (!symbolKlines) return []
- const klines = symbolKlines.get(interval)
- if (!klines) return []
- return klines.slice(-limit)
- }
- /**
- * 获取深度数据
- * @param symbol 交易对
- */
- public getDepthData(symbol: string): DepthData | null {
- return this.depthDataMap.get(symbol) || null
- }
- /**
- * 获取订阅的符号列表
- */
- public getSubscribedSymbols(): string[] {
- return Array.from(this.subscribedSymbols)
- }
- /**
- * 检查是否已连接
- */
- public isConnectedToWebSocket(): boolean {
- return this.isConnected
- }
- /**
- * 处理 WebSocket 消息
- */
- private handleMessage(data: WebSocket.Data): void {
- try {
- const message = JSON.parse(data.toString())
- if (message.e === '24hrTicker') {
- this.handleTicker24hr(message)
- } else if (message.e === 'kline') {
- this.handleKlineData(message)
- } else if (message.e === 'depthUpdate') {
- this.handleDepthData(message)
- } else if (message.result === null && message.id) {
- // 订阅确认消息
- console.log('✅ 订阅成功:', message)
- } else if (message.pong) {
- // Pong 响应
- this.handlePong()
- }
- } catch (error) {
- console.error('❌ 解析 WebSocket 消息失败:', error)
- }
- }
- /**
- * 处理24小时行情数据
- */
- private handleTicker24hr(data: any): void {
- const symbol = data.s
- const tickerData: Ticker24hr = {
- symbol: data.s,
- priceChange: data.P,
- priceChangePercent: data.P,
- weightedAvgPrice: data.w,
- prevClosePrice: data.x,
- lastPrice: data.c,
- lastQty: data.Q,
- bidPrice: data.b,
- bidQty: data.B,
- askPrice: data.a,
- askQty: data.A,
- openPrice: data.o,
- highPrice: data.h,
- lowPrice: data.l,
- volume: data.v,
- quoteVolume: data.q,
- openTime: data.O,
- closeTime: data.C,
- firstId: data.F,
- lastId: data.L,
- count: data.n,
- }
- this.ticker24hrMap.set(symbol, tickerData)
- // 转换为 MarketData 格式
- const marketData: MarketData = {
- symbol: symbol,
- price: parseFloat(data.c),
- volume: parseFloat(data.v),
- change: parseFloat(data.P),
- changePercent: parseFloat(data.P),
- high: parseFloat(data.h),
- low: parseFloat(data.l),
- open: parseFloat(data.o),
- close: parseFloat(data.c),
- timestamp: data.E,
- bid: parseFloat(data.b),
- ask: parseFloat(data.a),
- bidSize: parseFloat(data.B),
- askSize: parseFloat(data.A),
- }
- this.marketDataMap.set(symbol, marketData)
- this.emit('ticker24hr', tickerData)
- this.emit('marketData', marketData)
- }
- /**
- * 处理K线数据
- */
- private handleKlineData(data: any): void {
- const symbol = data.s
- const interval = data.k.i
- const kline = data.k
- const klineData: KlineData = {
- symbol: symbol,
- interval: interval,
- openTime: kline.t,
- open: parseFloat(kline.o),
- high: parseFloat(kline.h),
- low: parseFloat(kline.l),
- close: parseFloat(kline.c),
- volume: parseFloat(kline.v),
- closeTime: kline.T,
- quoteVolume: parseFloat(kline.q),
- trades: kline.n,
- takerBuyBaseVolume: parseFloat(kline.V),
- takerBuyQuoteVolume: parseFloat(kline.Q),
- }
- // 初始化数据结构
- if (!this.klineDataMap.has(symbol)) {
- this.klineDataMap.set(symbol, new Map())
- }
- const symbolKlines = this.klineDataMap.get(symbol)!
- if (!symbolKlines.has(interval)) {
- symbolKlines.set(interval, [])
- }
- const klines = symbolKlines.get(interval)!
- // 更新或添加K线数据
- const existingIndex = klines.findIndex(k => k.openTime === klineData.openTime)
- if (existingIndex >= 0) {
- klines[existingIndex] = klineData
- } else {
- klines.push(klineData)
- // 保持最多1000条记录
- if (klines.length > 1000) {
- klines.splice(0, klines.length - 1000)
- }
- }
- this.emit('kline', klineData)
- }
- /**
- * 处理深度数据
- */
- private handleDepthData(data: any): void {
- const symbol = data.s
- const depthData: DepthData = {
- symbol: symbol,
- bids: data.b,
- asks: data.a,
- lastUpdateId: data.u,
- }
- this.depthDataMap.set(symbol, depthData)
- this.emit('depth', depthData)
- }
- /**
- * 发送订阅请求
- */
- private subscribe(streams: string[]): void {
- if (!this.ws || !this.isConnected) {
- console.warn('⚠️ WebSocket 未连接,无法订阅')
- return
- }
- const message = {
- method: 'SUBSCRIBE',
- params: streams,
- id: Date.now(),
- }
- this.ws.send(JSON.stringify(message))
- console.log('📡 订阅行情数据:', streams)
- }
- /**
- * 发送取消订阅请求
- */
- private unsubscribe(streams: string[]): void {
- if (!this.ws || !this.isConnected) {
- return
- }
- const message = {
- method: 'UNSUBSCRIBE',
- params: streams,
- id: Date.now(),
- }
- this.ws.send(JSON.stringify(message))
- console.log('📡 取消订阅:', streams)
- }
- /**
- * 重新订阅所有数据
- */
- private resubscribeAll(): void {
- if (this.subscribedSymbols.size > 0) {
- const symbols = Array.from(this.subscribedSymbols)
- const intervals = Array.from(this.subscribedIntervals)
- this.subscribeMarketData(symbols, intervals)
- }
- }
- /**
- * 启动 Ping 定时器
- */
- private startPingTimer(): void {
- this.pingTimer = setInterval(() => {
- if (this.ws && this.isConnected) {
- this.ws.ping()
- this.startPongTimer()
- }
- }, 30000) // 每30秒发送一次 ping
- }
- /**
- * 停止 Ping 定时器
- */
- private stopPingTimer(): void {
- if (this.pingTimer) {
- clearInterval(this.pingTimer)
- this.pingTimer = null
- }
- }
- /**
- * 启动 Pong 超时定时器
- */
- private startPongTimer(): void {
- this.pongTimer = setTimeout(() => {
- console.warn('⚠️ Pong 超时,重新连接 WebSocket')
- this.disconnect()
- this.scheduleReconnect()
- }, 10000) // 10秒超时
- }
- /**
- * 处理 Pong 响应
- */
- private handlePong(): void {
- if (this.pongTimer) {
- clearTimeout(this.pongTimer)
- this.pongTimer = null
- }
- }
- /**
- * 安排重连
- */
- private scheduleReconnect(): void {
- if (this.reconnectAttempts >= this.maxReconnectAttempts) {
- console.error('❌ 达到最大重连次数,停止重连')
- this.emit('maxReconnectAttemptsReached')
- return
- }
- this.clearReconnectTimer()
- this.reconnectTimer = setTimeout(() => {
- this.reconnectAttempts++
- console.log(`🔄 尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
- this.connect().catch(error => {
- console.error('❌ 重连失败:', error)
- })
- }, this.reconnectInterval)
- }
- /**
- * 清除重连定时器
- */
- private clearReconnectTimer(): void {
- if (this.reconnectTimer) {
- clearTimeout(this.reconnectTimer)
- this.reconnectTimer = null
- }
- }
- }
|