userStreamClient.ts 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. import { EventEmitter } from 'events';
  2. import WebSocket from 'ws';
  3. import { AsterListenKeyManager } from './listenKeyManager';
  4. import { AsterTradingConfig } from './tradingTypes';
  5. import {
  6. AsterUserStreamEvent,
  7. AsterUserStreamEvents,
  8. AsterListenKeyExpiredEvent,
  9. AsterAccountUpdateEvent,
  10. AsterOrderTradeUpdateEvent,
  11. AsterAccountConfigUpdateEvent
  12. } from './userStreamTypes';
  13. export interface AsterUserStreamConfig {
  14. wsUrl: string;
  15. autoReconnect?: boolean;
  16. reconnectIntervalMs?: number;
  17. maxReconnectAttempts?: number;
  18. pingIntervalMs?: number;
  19. pongTimeoutMs?: number;
  20. }
  21. export class AsterUserStreamClient extends EventEmitter {
  22. private config: AsterUserStreamConfig;
  23. private tradingConfig: AsterTradingConfig;
  24. private listenKeyManager: AsterListenKeyManager;
  25. private ws: WebSocket | null = null;
  26. private isConnected: boolean = false;
  27. private reconnectAttempts: number = 0;
  28. private pingTimer: NodeJS.Timeout | null = null;
  29. private pongTimer: NodeJS.Timeout | null = null;
  30. private reconnectTimer: NodeJS.Timeout | null = null;
  31. constructor(
  32. userStreamConfig: AsterUserStreamConfig,
  33. tradingConfig: AsterTradingConfig
  34. ) {
  35. super();
  36. this.config = {
  37. autoReconnect: true,
  38. reconnectIntervalMs: 5000,
  39. maxReconnectAttempts: 10,
  40. pingIntervalMs: 30000,
  41. pongTimeoutMs: 10000,
  42. ...userStreamConfig
  43. };
  44. this.tradingConfig = tradingConfig;
  45. this.listenKeyManager = new AsterListenKeyManager(tradingConfig);
  46. }
  47. /**
  48. * 连接用户数据流
  49. */
  50. async connect(): Promise<void> {
  51. if (this.ws) {
  52. console.log('⚠️ 用户数据流已连接');
  53. return;
  54. }
  55. try {
  56. // 获取或创建 ListenKey
  57. let listenKey = this.listenKeyManager.getListenKey();
  58. if (!listenKey) {
  59. listenKey = await this.listenKeyManager.createListenKey();
  60. }
  61. // 构建 WebSocket URL
  62. const wsUrl = `${this.config.wsUrl}/ws/${listenKey}`;
  63. console.log(`🔌 连接用户数据流: ${wsUrl}`);
  64. // 创建 WebSocket 连接
  65. this.ws = new WebSocket(wsUrl);
  66. this.ws.on('open', () => {
  67. console.log('✅ 用户数据流连接成功');
  68. this.isConnected = true;
  69. this.reconnectAttempts = 0;
  70. this.emit('connect');
  71. this.startPing();
  72. });
  73. this.ws.on('message', (data: WebSocket.Data) => {
  74. this.handleMessage(data);
  75. });
  76. this.ws.on('close', (code: number, reason: Buffer) => {
  77. console.log(`🔌 用户数据流连接断开: ${code} ${reason.toString()}`);
  78. this.isConnected = false;
  79. this.stopPing();
  80. this.emit('disconnect');
  81. if (this.config.autoReconnect && this.reconnectAttempts < this.config.maxReconnectAttempts!) {
  82. this.scheduleReconnect();
  83. }
  84. });
  85. this.ws.on('error', (error: Error) => {
  86. console.error('❌ 用户数据流错误:', error);
  87. this.emit('error', error);
  88. });
  89. this.ws.on('pong', () => {
  90. this.onPong();
  91. });
  92. } catch (error: any) {
  93. console.error('❌ 连接用户数据流失败:', error.message);
  94. this.emit('error', error);
  95. throw error;
  96. }
  97. }
  98. /**
  99. * 断开用户数据流连接
  100. */
  101. disconnect(): void {
  102. if (this.ws) {
  103. this.ws.close();
  104. this.ws = null;
  105. }
  106. this.isConnected = false;
  107. this.stopPing();
  108. this.stopReconnect();
  109. }
  110. /**
  111. * 销毁客户端
  112. */
  113. async destroy(): Promise<void> {
  114. this.disconnect();
  115. await this.listenKeyManager.destroy();
  116. }
  117. /**
  118. * 处理 WebSocket 消息
  119. */
  120. private handleMessage(data: WebSocket.Data): void {
  121. try {
  122. const message = JSON.parse(data.toString()) as AsterUserStreamEvent;
  123. switch (message.e) {
  124. case 'listenKeyExpired':
  125. this.handleListenKeyExpired(message as AsterListenKeyExpiredEvent);
  126. break;
  127. case 'ACCOUNT_UPDATE':
  128. this.handleAccountUpdate(message as AsterAccountUpdateEvent);
  129. break;
  130. case 'ORDER_TRADE_UPDATE':
  131. this.handleOrderTradeUpdate(message as AsterOrderTradeUpdateEvent);
  132. break;
  133. case 'ACCOUNT_CONFIG_UPDATE':
  134. this.handleAccountConfigUpdate(message as AsterAccountConfigUpdateEvent);
  135. break;
  136. default:
  137. console.log('📨 未知用户数据流事件:', message);
  138. }
  139. } catch (error: any) {
  140. console.error('❌ 解析用户数据流消息失败:', error.message);
  141. this.emit('error', error);
  142. }
  143. }
  144. /**
  145. * 处理 ListenKey 过期事件
  146. */
  147. private async handleListenKeyExpired(event: AsterListenKeyExpiredEvent): Promise<void> {
  148. console.log('⚠️ ListenKey 已过期,重新创建...');
  149. this.emit('listenKeyExpired', event);
  150. try {
  151. // 重新创建 ListenKey
  152. const newListenKey = await this.listenKeyManager.createListenKey();
  153. // 重新连接
  154. this.disconnect();
  155. await this.connect();
  156. this.emit('reconnect');
  157. } catch (error: any) {
  158. console.error('❌ 重新创建 ListenKey 失败:', error.message);
  159. this.emit('error', error);
  160. }
  161. }
  162. /**
  163. * 处理账户更新事件
  164. */
  165. private handleAccountUpdate(event: AsterAccountUpdateEvent): void {
  166. console.log('📊 账户更新:', {
  167. 原因: event.a.m,
  168. 余额变化: event.a.B.length,
  169. 持仓变化: event.a.P.length,
  170. 时间: new Date(event.E).toISOString()
  171. });
  172. this.emit('accountUpdate', event);
  173. }
  174. /**
  175. * 处理订单/交易更新事件
  176. */
  177. private handleOrderTradeUpdate(event: AsterOrderTradeUpdateEvent): void {
  178. const order = event.o;
  179. console.log('📋 订单更新:', {
  180. 交易对: order.s,
  181. 订单ID: order.i,
  182. 客户端订单ID: order.c,
  183. 方向: order.S,
  184. 类型: order.o,
  185. 状态: order.X,
  186. 执行类型: order.x,
  187. 数量: order.q,
  188. 价格: order.p,
  189. 时间: new Date(event.E).toISOString()
  190. });
  191. this.emit('orderTradeUpdate', event);
  192. }
  193. /**
  194. * 处理账户配置更新事件
  195. */
  196. private handleAccountConfigUpdate(event: AsterAccountConfigUpdateEvent): void {
  197. if (event.ac) {
  198. console.log('⚙️ 交易对配置更新:', {
  199. 交易对: event.ac.s,
  200. 杠杆倍数: event.ac.l,
  201. 时间: new Date(event.E).toISOString()
  202. });
  203. }
  204. if (event.ai) {
  205. console.log('⚙️ 账户配置更新:', {
  206. 联合保证金: event.ai.j,
  207. 时间: new Date(event.E).toISOString()
  208. });
  209. }
  210. this.emit('accountConfigUpdate', event);
  211. }
  212. /**
  213. * 启动 Ping
  214. */
  215. private startPing(): void {
  216. this.stopPing();
  217. this.pingTimer = setInterval(() => {
  218. if (this.ws && this.ws.readyState === WebSocket.OPEN) {
  219. this.ws.ping();
  220. this.startPongTimeout();
  221. }
  222. }, this.config.pingIntervalMs!);
  223. }
  224. /**
  225. * 停止 Ping
  226. */
  227. private stopPing(): void {
  228. if (this.pingTimer) {
  229. clearInterval(this.pingTimer);
  230. this.pingTimer = null;
  231. }
  232. this.stopPongTimeout();
  233. }
  234. /**
  235. * 启动 Pong 超时
  236. */
  237. private startPongTimeout(): void {
  238. this.stopPongTimeout();
  239. this.pongTimer = setTimeout(() => {
  240. console.log('⚠️ Pong 超时,断开连接');
  241. if (this.ws) {
  242. this.ws.terminate();
  243. }
  244. }, this.config.pongTimeoutMs!);
  245. }
  246. /**
  247. * 停止 Pong 超时
  248. */
  249. private stopPongTimeout(): void {
  250. if (this.pongTimer) {
  251. clearTimeout(this.pongTimer);
  252. this.pongTimer = null;
  253. }
  254. }
  255. /**
  256. * 处理 Pong
  257. */
  258. private onPong(): void {
  259. this.stopPongTimeout();
  260. }
  261. /**
  262. * 安排重连
  263. */
  264. private scheduleReconnect(): void {
  265. this.stopReconnect();
  266. this.reconnectAttempts++;
  267. const delay = this.config.reconnectIntervalMs! * this.reconnectAttempts;
  268. console.log(`🔄 ${delay / 1000} 秒后尝试重连 (${this.reconnectAttempts}/${this.config.maxReconnectAttempts})`);
  269. this.reconnectTimer = setTimeout(async () => {
  270. try {
  271. await this.connect();
  272. } catch (error: any) {
  273. console.error('❌ 重连失败:', error.message);
  274. if (this.reconnectAttempts < this.config.maxReconnectAttempts!) {
  275. this.scheduleReconnect();
  276. } else {
  277. console.error('❌ 达到最大重连次数,停止重连');
  278. }
  279. }
  280. }, delay);
  281. }
  282. /**
  283. * 停止重连
  284. */
  285. private stopReconnect(): void {
  286. if (this.reconnectTimer) {
  287. clearTimeout(this.reconnectTimer);
  288. this.reconnectTimer = null;
  289. }
  290. }
  291. /**
  292. * 获取连接状态
  293. */
  294. isStreamConnected(): boolean {
  295. return this.isConnected && this.ws?.readyState === WebSocket.OPEN;
  296. }
  297. /**
  298. * 获取 ListenKey
  299. */
  300. getListenKey(): string | null {
  301. return this.listenKeyManager.getListenKey();
  302. }
  303. /**
  304. * 手动延长 ListenKey
  305. */
  306. async extendListenKey(): Promise<void> {
  307. return this.listenKeyManager.extendListenKey();
  308. }
  309. /**
  310. * 更新配置
  311. */
  312. updateConfig(newConfig: Partial<AsterUserStreamConfig>): void {
  313. this.config = { ...this.config, ...newConfig };
  314. }
  315. }