import { EventEmitter } from 'events'; import WebSocket from 'ws'; import { AsterListenKeyManager } from './listenKeyManager'; import { AsterTradingConfig } from './tradingTypes'; import { AsterUserStreamEvent, AsterUserStreamEvents, AsterListenKeyExpiredEvent, AsterAccountUpdateEvent, AsterOrderTradeUpdateEvent, AsterAccountConfigUpdateEvent } from './userStreamTypes'; export interface AsterUserStreamConfig { wsUrl: string; autoReconnect?: boolean; reconnectIntervalMs?: number; maxReconnectAttempts?: number; pingIntervalMs?: number; pongTimeoutMs?: number; } export class AsterUserStreamClient extends EventEmitter { private config: AsterUserStreamConfig; private tradingConfig: AsterTradingConfig; private listenKeyManager: AsterListenKeyManager; private ws: WebSocket | null = null; private isConnected: boolean = false; private reconnectAttempts: number = 0; private pingTimer: NodeJS.Timeout | null = null; private pongTimer: NodeJS.Timeout | null = null; private reconnectTimer: NodeJS.Timeout | null = null; constructor( userStreamConfig: AsterUserStreamConfig, tradingConfig: AsterTradingConfig ) { super(); this.config = { autoReconnect: true, reconnectIntervalMs: 5000, maxReconnectAttempts: 10, pingIntervalMs: 30000, pongTimeoutMs: 10000, ...userStreamConfig }; this.tradingConfig = tradingConfig; this.listenKeyManager = new AsterListenKeyManager(tradingConfig); } /** * 连接用户数据流 */ async connect(): Promise { if (this.ws) { console.log('⚠️ 用户数据流已连接'); return; } try { // 获取或创建 ListenKey let listenKey = this.listenKeyManager.getListenKey(); if (!listenKey) { listenKey = await this.listenKeyManager.createListenKey(); } // 构建 WebSocket URL const wsUrl = `${this.config.wsUrl}/ws/${listenKey}`; console.log(`🔌 连接用户数据流: ${wsUrl}`); // 创建 WebSocket 连接 this.ws = new WebSocket(wsUrl); this.ws.on('open', () => { console.log('✅ 用户数据流连接成功'); this.isConnected = true; this.reconnectAttempts = 0; this.emit('connect'); this.startPing(); }); this.ws.on('message', (data: WebSocket.Data) => { this.handleMessage(data); }); this.ws.on('close', (code: number, reason: Buffer) => { console.log(`🔌 用户数据流连接断开: ${code} ${reason.toString()}`); this.isConnected = false; this.stopPing(); this.emit('disconnect'); if (this.config.autoReconnect && this.reconnectAttempts < this.config.maxReconnectAttempts!) { this.scheduleReconnect(); } }); this.ws.on('error', (error: Error) => { console.error('❌ 用户数据流错误:', error); this.emit('error', error); }); this.ws.on('pong', () => { this.onPong(); }); } catch (error: any) { console.error('❌ 连接用户数据流失败:', error.message); this.emit('error', error); throw error; } } /** * 断开用户数据流连接 */ disconnect(): void { if (this.ws) { this.ws.close(); this.ws = null; } this.isConnected = false; this.stopPing(); this.stopReconnect(); } /** * 销毁客户端 */ async destroy(): Promise { this.disconnect(); await this.listenKeyManager.destroy(); } /** * 处理 WebSocket 消息 */ private handleMessage(data: WebSocket.Data): void { try { const message = JSON.parse(data.toString()) as AsterUserStreamEvent; switch (message.e) { case 'listenKeyExpired': this.handleListenKeyExpired(message as AsterListenKeyExpiredEvent); break; case 'ACCOUNT_UPDATE': this.handleAccountUpdate(message as AsterAccountUpdateEvent); break; case 'ORDER_TRADE_UPDATE': this.handleOrderTradeUpdate(message as AsterOrderTradeUpdateEvent); break; case 'ACCOUNT_CONFIG_UPDATE': this.handleAccountConfigUpdate(message as AsterAccountConfigUpdateEvent); break; default: console.log('📨 未知用户数据流事件:', message); } } catch (error: any) { console.error('❌ 解析用户数据流消息失败:', error.message); this.emit('error', error); } } /** * 处理 ListenKey 过期事件 */ private async handleListenKeyExpired(event: AsterListenKeyExpiredEvent): Promise { console.log('⚠️ ListenKey 已过期,重新创建...'); this.emit('listenKeyExpired', event); try { // 重新创建 ListenKey const newListenKey = await this.listenKeyManager.createListenKey(); // 重新连接 this.disconnect(); await this.connect(); this.emit('reconnect'); } catch (error: any) { console.error('❌ 重新创建 ListenKey 失败:', error.message); this.emit('error', error); } } /** * 处理账户更新事件 */ private handleAccountUpdate(event: AsterAccountUpdateEvent): void { console.log('📊 账户更新:', { 原因: event.a.m, 余额变化: event.a.B.length, 持仓变化: event.a.P.length, 时间: new Date(event.E).toISOString() }); this.emit('accountUpdate', event); } /** * 处理订单/交易更新事件 */ private handleOrderTradeUpdate(event: AsterOrderTradeUpdateEvent): void { const order = event.o; console.log('📋 订单更新:', { 交易对: order.s, 订单ID: order.i, 客户端订单ID: order.c, 方向: order.S, 类型: order.o, 状态: order.X, 执行类型: order.x, 数量: order.q, 价格: order.p, 时间: new Date(event.E).toISOString() }); this.emit('orderTradeUpdate', event); } /** * 处理账户配置更新事件 */ private handleAccountConfigUpdate(event: AsterAccountConfigUpdateEvent): void { if (event.ac) { console.log('⚙️ 交易对配置更新:', { 交易对: event.ac.s, 杠杆倍数: event.ac.l, 时间: new Date(event.E).toISOString() }); } if (event.ai) { console.log('⚙️ 账户配置更新:', { 联合保证金: event.ai.j, 时间: new Date(event.E).toISOString() }); } this.emit('accountConfigUpdate', event); } /** * 启动 Ping */ private startPing(): void { this.stopPing(); this.pingTimer = setInterval(() => { if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.ws.ping(); this.startPongTimeout(); } }, this.config.pingIntervalMs!); } /** * 停止 Ping */ private stopPing(): void { if (this.pingTimer) { clearInterval(this.pingTimer); this.pingTimer = null; } this.stopPongTimeout(); } /** * 启动 Pong 超时 */ private startPongTimeout(): void { this.stopPongTimeout(); this.pongTimer = setTimeout(() => { console.log('⚠️ Pong 超时,断开连接'); if (this.ws) { this.ws.terminate(); } }, this.config.pongTimeoutMs!); } /** * 停止 Pong 超时 */ private stopPongTimeout(): void { if (this.pongTimer) { clearTimeout(this.pongTimer); this.pongTimer = null; } } /** * 处理 Pong */ private onPong(): void { this.stopPongTimeout(); } /** * 安排重连 */ private scheduleReconnect(): void { this.stopReconnect(); this.reconnectAttempts++; const delay = this.config.reconnectIntervalMs! * this.reconnectAttempts; console.log(`🔄 ${delay / 1000} 秒后尝试重连 (${this.reconnectAttempts}/${this.config.maxReconnectAttempts})`); this.reconnectTimer = setTimeout(async () => { try { await this.connect(); } catch (error: any) { console.error('❌ 重连失败:', error.message); if (this.reconnectAttempts < this.config.maxReconnectAttempts!) { this.scheduleReconnect(); } else { console.error('❌ 达到最大重连次数,停止重连'); } } }, delay); } /** * 停止重连 */ private stopReconnect(): void { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } } /** * 获取连接状态 */ isStreamConnected(): boolean { return this.isConnected && this.ws?.readyState === WebSocket.OPEN; } /** * 获取 ListenKey */ getListenKey(): string | null { return this.listenKeyManager.getListenKey(); } /** * 手动延长 ListenKey */ async extendListenKey(): Promise { return this.listenKeyManager.extendListenKey(); } /** * 更新配置 */ updateConfig(newConfig: Partial): void { this.config = { ...this.config, ...newConfig }; } }