| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- import { EventEmitter } from 'events';
- import WebSocket from 'ws';
- import { AsterListenKeyManager } from './listenKeyManager';
- import { AsterTradingConfig } from './tradingTypes';
- import {
- AsterUserStreamEvent,
- AsterUserStreamEvents,
- AsterListenKeyExpiredEvent,
- AsterAccountUpdateEvent,
- AsterOrderTradeUpdateEvent,
- AsterAccountConfigUpdateEvent
- } from './userStreamTypes';
- export interface AsterUserStreamConfig {
- wsUrl: string;
- autoReconnect?: boolean;
- reconnectIntervalMs?: number;
- maxReconnectAttempts?: number;
- pingIntervalMs?: number;
- pongTimeoutMs?: number;
- }
- export class AsterUserStreamClient extends EventEmitter {
- private config: AsterUserStreamConfig;
- private tradingConfig: AsterTradingConfig;
- private listenKeyManager: AsterListenKeyManager;
- private ws: WebSocket | null = null;
- private isConnected: boolean = false;
- private reconnectAttempts: number = 0;
- private pingTimer: NodeJS.Timeout | null = null;
- private pongTimer: NodeJS.Timeout | null = null;
- private reconnectTimer: NodeJS.Timeout | null = null;
- constructor(
- userStreamConfig: AsterUserStreamConfig,
- tradingConfig: AsterTradingConfig
- ) {
- super();
- this.config = {
- autoReconnect: true,
- reconnectIntervalMs: 5000,
- maxReconnectAttempts: 10,
- pingIntervalMs: 30000,
- pongTimeoutMs: 10000,
- ...userStreamConfig
- };
- this.tradingConfig = tradingConfig;
- this.listenKeyManager = new AsterListenKeyManager(tradingConfig);
- }
- /**
- * 连接用户数据流
- */
- async connect(): Promise<void> {
- if (this.ws) {
- console.log('⚠️ 用户数据流已连接');
- return;
- }
- try {
- // 获取或创建 ListenKey
- let listenKey = this.listenKeyManager.getListenKey();
- if (!listenKey) {
- listenKey = await this.listenKeyManager.createListenKey();
- }
- // 构建 WebSocket URL
- const wsUrl = `${this.config.wsUrl}/ws/${listenKey}`;
- console.log(`🔌 连接用户数据流: ${wsUrl}`);
- // 创建 WebSocket 连接
- this.ws = new WebSocket(wsUrl);
- this.ws.on('open', () => {
- console.log('✅ 用户数据流连接成功');
- this.isConnected = true;
- this.reconnectAttempts = 0;
- this.emit('connect');
- this.startPing();
- });
- this.ws.on('message', (data: WebSocket.Data) => {
- this.handleMessage(data);
- });
- this.ws.on('close', (code: number, reason: Buffer) => {
- console.log(`🔌 用户数据流连接断开: ${code} ${reason.toString()}`);
- this.isConnected = false;
- this.stopPing();
- this.emit('disconnect');
-
- if (this.config.autoReconnect && this.reconnectAttempts < this.config.maxReconnectAttempts!) {
- this.scheduleReconnect();
- }
- });
- this.ws.on('error', (error: Error) => {
- console.error('❌ 用户数据流错误:', error);
- this.emit('error', error);
- });
- this.ws.on('pong', () => {
- this.onPong();
- });
- } catch (error: any) {
- console.error('❌ 连接用户数据流失败:', error.message);
- this.emit('error', error);
- throw error;
- }
- }
- /**
- * 断开用户数据流连接
- */
- disconnect(): void {
- if (this.ws) {
- this.ws.close();
- this.ws = null;
- }
- this.isConnected = false;
- this.stopPing();
- this.stopReconnect();
- }
- /**
- * 销毁客户端
- */
- async destroy(): Promise<void> {
- this.disconnect();
- await this.listenKeyManager.destroy();
- }
- /**
- * 处理 WebSocket 消息
- */
- private handleMessage(data: WebSocket.Data): void {
- try {
- const message = JSON.parse(data.toString()) as AsterUserStreamEvent;
-
- switch (message.e) {
- case 'listenKeyExpired':
- this.handleListenKeyExpired(message as AsterListenKeyExpiredEvent);
- break;
- case 'ACCOUNT_UPDATE':
- this.handleAccountUpdate(message as AsterAccountUpdateEvent);
- break;
- case 'ORDER_TRADE_UPDATE':
- this.handleOrderTradeUpdate(message as AsterOrderTradeUpdateEvent);
- break;
- case 'ACCOUNT_CONFIG_UPDATE':
- this.handleAccountConfigUpdate(message as AsterAccountConfigUpdateEvent);
- break;
- default:
- console.log('📨 未知用户数据流事件:', message);
- }
- } catch (error: any) {
- console.error('❌ 解析用户数据流消息失败:', error.message);
- this.emit('error', error);
- }
- }
- /**
- * 处理 ListenKey 过期事件
- */
- private async handleListenKeyExpired(event: AsterListenKeyExpiredEvent): Promise<void> {
- console.log('⚠️ ListenKey 已过期,重新创建...');
- this.emit('listenKeyExpired', event);
-
- try {
- // 重新创建 ListenKey
- const newListenKey = await this.listenKeyManager.createListenKey();
-
- // 重新连接
- this.disconnect();
- await this.connect();
-
- this.emit('reconnect');
- } catch (error: any) {
- console.error('❌ 重新创建 ListenKey 失败:', error.message);
- this.emit('error', error);
- }
- }
- /**
- * 处理账户更新事件
- */
- private handleAccountUpdate(event: AsterAccountUpdateEvent): void {
- console.log('📊 账户更新:', {
- 原因: event.a.m,
- 余额变化: event.a.B.length,
- 持仓变化: event.a.P.length,
- 时间: new Date(event.E).toISOString()
- });
-
- this.emit('accountUpdate', event);
- }
- /**
- * 处理订单/交易更新事件
- */
- private handleOrderTradeUpdate(event: AsterOrderTradeUpdateEvent): void {
- const order = event.o;
- console.log('📋 订单更新:', {
- 交易对: order.s,
- 订单ID: order.i,
- 客户端订单ID: order.c,
- 方向: order.S,
- 类型: order.o,
- 状态: order.X,
- 执行类型: order.x,
- 数量: order.q,
- 价格: order.p,
- 时间: new Date(event.E).toISOString()
- });
-
- this.emit('orderTradeUpdate', event);
- }
- /**
- * 处理账户配置更新事件
- */
- private handleAccountConfigUpdate(event: AsterAccountConfigUpdateEvent): void {
- if (event.ac) {
- console.log('⚙️ 交易对配置更新:', {
- 交易对: event.ac.s,
- 杠杆倍数: event.ac.l,
- 时间: new Date(event.E).toISOString()
- });
- }
-
- if (event.ai) {
- console.log('⚙️ 账户配置更新:', {
- 联合保证金: event.ai.j,
- 时间: new Date(event.E).toISOString()
- });
- }
-
- this.emit('accountConfigUpdate', event);
- }
- /**
- * 启动 Ping
- */
- private startPing(): void {
- this.stopPing();
-
- this.pingTimer = setInterval(() => {
- if (this.ws && this.ws.readyState === WebSocket.OPEN) {
- this.ws.ping();
- this.startPongTimeout();
- }
- }, this.config.pingIntervalMs!);
- }
- /**
- * 停止 Ping
- */
- private stopPing(): void {
- if (this.pingTimer) {
- clearInterval(this.pingTimer);
- this.pingTimer = null;
- }
- this.stopPongTimeout();
- }
- /**
- * 启动 Pong 超时
- */
- private startPongTimeout(): void {
- this.stopPongTimeout();
-
- this.pongTimer = setTimeout(() => {
- console.log('⚠️ Pong 超时,断开连接');
- if (this.ws) {
- this.ws.terminate();
- }
- }, this.config.pongTimeoutMs!);
- }
- /**
- * 停止 Pong 超时
- */
- private stopPongTimeout(): void {
- if (this.pongTimer) {
- clearTimeout(this.pongTimer);
- this.pongTimer = null;
- }
- }
- /**
- * 处理 Pong
- */
- private onPong(): void {
- this.stopPongTimeout();
- }
- /**
- * 安排重连
- */
- private scheduleReconnect(): void {
- this.stopReconnect();
-
- this.reconnectAttempts++;
- const delay = this.config.reconnectIntervalMs! * this.reconnectAttempts;
-
- console.log(`🔄 ${delay / 1000} 秒后尝试重连 (${this.reconnectAttempts}/${this.config.maxReconnectAttempts})`);
-
- this.reconnectTimer = setTimeout(async () => {
- try {
- await this.connect();
- } catch (error: any) {
- console.error('❌ 重连失败:', error.message);
- if (this.reconnectAttempts < this.config.maxReconnectAttempts!) {
- this.scheduleReconnect();
- } else {
- console.error('❌ 达到最大重连次数,停止重连');
- }
- }
- }, delay);
- }
- /**
- * 停止重连
- */
- private stopReconnect(): void {
- if (this.reconnectTimer) {
- clearTimeout(this.reconnectTimer);
- this.reconnectTimer = null;
- }
- }
- /**
- * 获取连接状态
- */
- isStreamConnected(): boolean {
- return this.isConnected && this.ws?.readyState === WebSocket.OPEN;
- }
- /**
- * 获取 ListenKey
- */
- getListenKey(): string | null {
- return this.listenKeyManager.getListenKey();
- }
- /**
- * 手动延长 ListenKey
- */
- async extendListenKey(): Promise<void> {
- return this.listenKeyManager.extendListenKey();
- }
- /**
- * 更新配置
- */
- updateConfig(newConfig: Partial<AsterUserStreamConfig>): void {
- this.config = { ...this.config, ...newConfig };
- }
- }
|