accountManager.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. import { EventEmitter } from 'events'
  2. import { ExchangeAdapter, PlaceOrderReq, Order, Position, Balance, Depth } from '../exchanges/ExchangeAdapter'
  3. export interface AccountKey {
  4. exchange: string // 例如 'pacifica' | 'aster' | 'binance'
  5. accountId: string // 账户唯一标识(如公钥/地址/子账户名)
  6. }
  7. export interface RegisteredAccount extends AccountKey {
  8. adapter: ExchangeAdapter
  9. meta?: Record<string, any>
  10. }
  11. export type AggregatedBalances = Array<Balance & AccountKey>
  12. export type AggregatedPositions = Array<Position & AccountKey>
  13. export interface AccountLiveState extends AccountKey {
  14. // 来自 WS 的最新聚合态(若有)
  15. // 统一字段采用字符串,避免精度丢失;raw 保留原文
  16. accountEquity?: string // ae
  17. marginUsed?: string // mu
  18. posCount?: number // pc
  19. openOrderCount?: number // oc
  20. stopCount?: number // sc
  21. asAssets?: string // as(资产聚合,若含义变动则作为备注)
  22. alertWarning?: string // aw(预警值/比率)
  23. lastInfoTs?: number // t(ms)
  24. infoRaw?: any
  25. balancesRaw?: any
  26. positionsRaw?: any
  27. positionSummary?: Array<{ symbol: string; netQty: string }> // 归一化后的净持仓(按符号)
  28. }
  29. /**
  30. * 多平台多账户集中管理器
  31. * - 统一注册与路由:按 (exchange, accountId) 获取对应适配器
  32. * - 聚合查询:balances/positions across all accounts
  33. * - 精准路由:place/cancel/get/openOrders/leverage/depth 到指定账户
  34. * - 事件汇聚:把各适配器的 WS 事件转发,并附加 exchange/accountId 标签
  35. */
  36. export class AccountManager {
  37. private readonly accounts = new Map<string, RegisteredAccount>()
  38. private readonly events = new EventEmitter()
  39. private readonly wiredHandlers = new Map<string, Array<{ ev: string; fn: (p: any) => void }>>()
  40. private lastBalances?: { ts: number; data: AggregatedBalances }
  41. private lastPositions?: { ts: number; data: AggregatedPositions }
  42. private readonly liveStates = new Map<string, AccountLiveState>()
  43. private readonly pollers = new Map<string, NodeJS.Timeout>()
  44. /** 组合键 */
  45. private static keyOf(exchange: string, accountId: string): string {
  46. return `${exchange}::${accountId}`
  47. }
  48. /** 订阅并转发常见 WS 事件,payload 附加账户标签 */
  49. private wireEvents(reg: RegisteredAccount) {
  50. const src = reg.adapter.ws()
  51. const relay = (eventName: string) => (payload: any) => {
  52. this.events.emit(eventName, { ...payload, __am: { exchange: reg.exchange, accountId: reg.accountId } })
  53. this.events.emit('am:*', { event: eventName, payload, exchange: reg.exchange, accountId: reg.accountId })
  54. this.refreshLiveState(eventName, payload, reg)
  55. }
  56. const knownEvents = [
  57. 'depth',
  58. 'orders',
  59. 'trades',
  60. 'balance',
  61. 'account_info',
  62. 'account_positions',
  63. 'ws_error',
  64. 'ws_close',
  65. ]
  66. const handlers: Array<{ ev: string; fn: (p: any) => void }> = []
  67. for (const ev of knownEvents) {
  68. const fn = relay(ev)
  69. src.on(ev as any, fn)
  70. handlers.push({ ev, fn })
  71. }
  72. this.wiredHandlers.set(AccountManager.keyOf(reg.exchange, reg.accountId), handlers)
  73. }
  74. private refreshLiveState(eventName: string, payload: any, reg: RegisteredAccount) {
  75. // 仅处理账户类事件
  76. const k = AccountManager.keyOf(reg.exchange, reg.accountId)
  77. const prev = this.liveStates.get(k) || ({ exchange: reg.exchange, accountId: reg.accountId } as AccountLiveState)
  78. const ch = payload && typeof payload === 'object' ? payload.channel || payload.type : undefined
  79. if (eventName === 'account_info' || ch === 'account_info') {
  80. const d = payload && payload.data !== undefined ? payload.data : payload
  81. const next: AccountLiveState = {
  82. ...prev,
  83. accountEquity: d?.ae != null ? String(d.ae) : prev.accountEquity,
  84. marginUsed: d?.mu != null ? String(d.mu) : prev.marginUsed,
  85. posCount: d?.pc != null ? Number(d.pc) : prev.posCount,
  86. openOrderCount: d?.oc != null ? Number(d.oc) : prev.openOrderCount,
  87. stopCount: d?.sc != null ? Number(d.sc) : prev.stopCount,
  88. asAssets: d?.as != null ? String(d.as) : prev.asAssets,
  89. alertWarning: d?.aw != null ? String(d.aw) : prev.alertWarning,
  90. lastInfoTs: d?.t != null ? Number(d.t) : prev.lastInfoTs ?? Date.now(),
  91. infoRaw: d,
  92. }
  93. this.liveStates.set(k, next)
  94. this.events.emit('am:account_state', { ...next, __am: { exchange: reg.exchange, accountId: reg.accountId } })
  95. return
  96. }
  97. if (eventName === 'account_positions' || ch === 'account_positions') {
  98. const d = payload && payload.data !== undefined ? payload.data : payload
  99. const next: AccountLiveState = { ...prev, positionsRaw: d }
  100. // 计算净持仓摘要
  101. try {
  102. const sum = this.computePositionSummary(d)
  103. if (sum && sum.length) next.positionSummary = sum
  104. } catch {}
  105. this.liveStates.set(k, next)
  106. this.events.emit('am:account_state', { ...next, __am: { exchange: reg.exchange, accountId: reg.accountId } })
  107. return
  108. }
  109. if (eventName === 'balance' || eventName === 'account_balance' || ch === 'account_balance') {
  110. const d = payload && payload.data !== undefined ? payload.data : payload
  111. const next: AccountLiveState = { ...prev, balancesRaw: d }
  112. this.liveStates.set(k, next)
  113. this.events.emit('am:account_state', { ...next, __am: { exchange: reg.exchange, accountId: reg.accountId } })
  114. return
  115. }
  116. }
  117. /** 注册账户(幂等) */
  118. register(account: RegisteredAccount): void {
  119. const k = AccountManager.keyOf(account.exchange, account.accountId)
  120. if (this.accounts.has(k)) return
  121. this.accounts.set(k, account)
  122. this.wireEvents(account)
  123. // 默认开启账户信息轮询(无账户类 WS 的接入可准实时刷新;有 WS 的接入也作为兜底)
  124. this.startPollingBalancesPositions(5000, { exchange: account.exchange, accountId: account.accountId })
  125. }
  126. /** 移除账户 */
  127. unregister(exchange: string, accountId: string): void {
  128. const k = AccountManager.keyOf(exchange, accountId)
  129. const reg = this.accounts.get(k)
  130. if (reg) {
  131. const src = reg.adapter.ws()
  132. const handlers = this.wiredHandlers.get(k) || []
  133. for (const h of handlers) src.off(h.ev as any, h.fn as any)
  134. this.wiredHandlers.delete(k)
  135. }
  136. this.accounts.delete(k)
  137. this.liveStates.delete(k)
  138. const t = this.pollers.get(k)
  139. if (t) {
  140. clearInterval(t)
  141. this.pollers.delete(k)
  142. }
  143. }
  144. /** 列出所有账户键 */
  145. list(): AccountKey[] {
  146. return Array.from(this.accounts.values()).map(({ exchange, accountId }) => ({ exchange, accountId }))
  147. }
  148. /** 获取指定账户的适配器 */
  149. getAdapter(exchange: string, accountId: string): ExchangeAdapter | undefined {
  150. const k = AccountManager.keyOf(exchange, accountId)
  151. return this.accounts.get(k)?.adapter
  152. }
  153. /** 事件总线(带聚合标签) */
  154. ws(): EventEmitter {
  155. return this.events
  156. }
  157. // ========== 聚合查询 ==========
  158. async balancesAll(opts?: { cacheMs?: number }): Promise<AggregatedBalances> {
  159. if (opts?.cacheMs && this.lastBalances && Date.now() - this.lastBalances.ts <= opts.cacheMs) {
  160. return this.lastBalances.data
  161. }
  162. const tasks = Array.from(this.accounts.values()).map(async acc => {
  163. try {
  164. const list = await acc.adapter.balances()
  165. return list.map(b => ({ ...b, exchange: acc.exchange, accountId: acc.accountId }))
  166. } catch {
  167. return [] as AggregatedBalances
  168. }
  169. })
  170. const parts = await Promise.all(tasks)
  171. const data = parts.flat()
  172. this.lastBalances = { ts: Date.now(), data }
  173. return data
  174. }
  175. async positionsAll(opts?: { cacheMs?: number }): Promise<AggregatedPositions> {
  176. if (opts?.cacheMs && this.lastPositions && Date.now() - this.lastPositions.ts <= opts.cacheMs) {
  177. return this.lastPositions.data
  178. }
  179. const tasks = Array.from(this.accounts.values()).map(async acc => {
  180. try {
  181. const list = await acc.adapter.positions()
  182. return list.map(p => ({ ...p, exchange: acc.exchange, accountId: acc.accountId }))
  183. } catch {
  184. return [] as AggregatedPositions
  185. }
  186. })
  187. const parts = await Promise.all(tasks)
  188. const data = parts.flat()
  189. this.lastPositions = { ts: Date.now(), data }
  190. return data
  191. }
  192. // ========== 精准路由(单账户) ==========
  193. async placeOrderOn(exchange: string, accountId: string, req: PlaceOrderReq): Promise<Order> {
  194. const ad = this.getOrThrow(exchange, accountId)
  195. return await ad.placeOrder(req)
  196. }
  197. async cancelOrderOn(exchange: string, accountId: string, symbol: string, orderId: string): Promise<void> {
  198. const ad = this.getOrThrow(exchange, accountId)
  199. await ad.cancelOrder(symbol, orderId)
  200. }
  201. async cancelAllOn(exchange: string, accountId: string, symbol: string): Promise<void> {
  202. const ad = this.getOrThrow(exchange, accountId)
  203. await ad.cancelAll(symbol)
  204. }
  205. async openOrdersOn(exchange: string, accountId: string, symbol: string): Promise<Order[]> {
  206. const ad = this.getOrThrow(exchange, accountId)
  207. return await ad.openOrders(symbol)
  208. }
  209. async leverageOn(exchange: string, accountId: string, symbol: string, lev: number): Promise<void> {
  210. const ad = this.getOrThrow(exchange, accountId)
  211. if (!ad.leverage) throw new Error(`适配器 ${exchange} 不支持杠杆功能()`)
  212. await ad.leverage(symbol, lev)
  213. }
  214. async depthOn(exchange: string, accountId: string, symbol: string, limit = 50): Promise<Depth> {
  215. const ad = this.getOrThrow(exchange, accountId)
  216. return await ad.depth(symbol, limit)
  217. }
  218. // ========== 批量/广播工具 ==========
  219. async cancelAllByExchange(exchange: string, symbol: string): Promise<void> {
  220. const tasks: Array<Promise<void>> = []
  221. for (const acc of this.accounts.values()) {
  222. if (acc.exchange !== exchange) continue
  223. tasks.push(acc.adapter.cancelAll(symbol).catch(() => {}))
  224. }
  225. await Promise.all(tasks)
  226. }
  227. // 批量注册
  228. registerMany(accounts: RegisteredAccount[]): void {
  229. for (const a of accounts) this.register(a)
  230. }
  231. // 聚合 open orders(按符号)
  232. async openOrdersAll(symbol: string): Promise<Array<Order & AccountKey>> {
  233. const tasks = Array.from(this.accounts.values()).map(async acc => {
  234. try {
  235. const list = await acc.adapter.openOrders(symbol)
  236. return list.map(o => ({ ...o, exchange: acc.exchange, accountId: acc.accountId }) as any)
  237. } catch {
  238. return [] as Array<Order & AccountKey>
  239. }
  240. })
  241. const parts = await Promise.all(tasks)
  242. return parts.flat()
  243. }
  244. // 快照(balances + positions)
  245. async snapshot(opts?: {
  246. cacheMs?: number
  247. }): Promise<{ ts: number; balances: AggregatedBalances; positions: AggregatedPositions }> {
  248. const [balances, positions] = await Promise.all([this.balancesAll(opts), this.positionsAll(opts)])
  249. return { ts: Date.now(), balances, positions }
  250. }
  251. // 获取单账户或全部账户的实时聚合态(基于 WS 增量)
  252. getLiveState(exchange: string, accountId: string): AccountLiveState | undefined {
  253. return this.liveStates.get(AccountManager.keyOf(exchange, accountId))
  254. }
  255. getAllLiveStates(): AccountLiveState[] {
  256. return Array.from(this.liveStates.values())
  257. }
  258. // ========== 启动自检(最小实现) ==========
  259. async initCheck(opts?: { symbolHints?: Record<string, string>; soft?: boolean; timeSkewWarnMs?: number }) {
  260. const results: Array<{
  261. key: AccountKey
  262. ok: boolean
  263. steps: Array<{ name: string; ok: boolean; error?: string }>
  264. }> = []
  265. for (const acc of this.accounts.values()) {
  266. const steps: Array<{ name: string; ok: boolean; error?: string }> = []
  267. // 0) time skew
  268. try {
  269. const serverTs = await acc.adapter.time()
  270. const skew = Math.abs(Date.now() - serverTs)
  271. const limit = opts?.timeSkewWarnMs ?? 2000
  272. steps.push({
  273. name: `time(skew=${skew}ms)`,
  274. ok: skew <= limit,
  275. error: skew > limit ? `skew>${limit}` : undefined,
  276. })
  277. } catch (e: any) {
  278. steps.push({ name: 'time', ok: false, error: String(e?.message || e) })
  279. }
  280. // 1) symbols()
  281. try {
  282. const syms = await acc.adapter.symbols()
  283. steps.push({ name: 'symbols', ok: Array.isArray(syms) })
  284. } catch (e: any) {
  285. steps.push({ name: 'symbols', ok: false, error: String(e?.message || e) })
  286. }
  287. // 2) depth()
  288. try {
  289. const hint = opts?.symbolHints?.[acc.exchange] || (acc.exchange === 'aster' ? 'BTCUSDT' : 'BTC')
  290. const d = await acc.adapter.depth(hint, 5)
  291. steps.push({ name: `depth(${hint})`, ok: !!(d && d.bids && d.asks) })
  292. } catch (e: any) {
  293. steps.push({ name: 'depth', ok: false, error: String(e?.message || e) })
  294. }
  295. // 3) balances()/positions()(可能需要鉴权,失败不判死刑)
  296. try {
  297. await acc.adapter.balances()
  298. steps.push({ name: 'balances', ok: true })
  299. } catch (e: any) {
  300. steps.push({ name: 'balances', ok: false, error: String(e?.message || e) })
  301. }
  302. try {
  303. await acc.adapter.positions()
  304. steps.push({ name: 'positions', ok: true })
  305. } catch (e: any) {
  306. steps.push({ name: 'positions', ok: false, error: String(e?.message || e) })
  307. }
  308. const ok = steps.filter(s => s.name === 'symbols' || s.name.startsWith('depth')).every(s => s.ok)
  309. results.push({ key: { exchange: acc.exchange, accountId: acc.accountId }, ok, steps })
  310. if (!ok && !opts?.soft) {
  311. const msg = steps.map(s => `${s.name}:${s.ok ? 'ok' : 'fail'}`).join(', ')
  312. throw new Error(`初始化检查失败 ${acc.exchange}::${acc.accountId} -> ${msg}`)
  313. }
  314. }
  315. return results
  316. }
  317. // ========== 辅助 ==========
  318. private getOrThrow(exchange: string, accountId: string): ExchangeAdapter {
  319. const ad = this.getAdapter(exchange, accountId)
  320. if (!ad) throw new Error(`账户管理器: 未找到适配器 ${exchange}::${accountId}`)
  321. return ad
  322. }
  323. // ========== 轮询补齐(无账户 WS 的接入,如 Aster) ==========
  324. startPollingBalancesPositions(intervalMs = 5000, filter?: { exchange?: string; accountId?: string }) {
  325. for (const acc of this.accounts.values()) {
  326. if (filter?.exchange && acc.exchange !== filter.exchange) continue
  327. if (filter?.accountId && acc.accountId !== filter.accountId) continue
  328. const key = AccountManager.keyOf(acc.exchange, acc.accountId)
  329. if (this.pollers.has(key)) continue
  330. const timer = setInterval(
  331. async () => {
  332. try {
  333. const [bals, poss] = await Promise.all([
  334. acc.adapter.balances().catch(() => undefined),
  335. acc.adapter.positions().catch(() => undefined),
  336. ])
  337. const prev =
  338. this.liveStates.get(key) || ({ exchange: acc.exchange, accountId: acc.accountId } as AccountLiveState)
  339. const next: AccountLiveState = { ...prev }
  340. if (bals !== undefined) next.balancesRaw = bals
  341. if (poss !== undefined) {
  342. next.positionsRaw = poss
  343. try {
  344. const sum = this.computePositionSummary(poss)
  345. if (sum && sum.length) next.positionSummary = sum
  346. } catch {}
  347. }
  348. if (bals !== undefined || poss !== undefined) {
  349. this.liveStates.set(key, next)
  350. this.events.emit('am:account_state', {
  351. ...next,
  352. __am: { exchange: acc.exchange, accountId: acc.accountId },
  353. })
  354. }
  355. } catch {}
  356. },
  357. Math.max(1000, intervalMs),
  358. )
  359. this.pollers.set(key, timer)
  360. }
  361. }
  362. /**
  363. * 从 positions 原始数据推导净持仓(按符号)。
  364. * 兼容 Pacifica({ symbol, amount })与 Aster({ symbol, positionAmt })等形态。
  365. */
  366. private computePositionSummary(raw: any): Array<{ symbol: string; netQty: string }> {
  367. const arr: any[] = Array.isArray(raw) ? raw : raw && Array.isArray(raw.positions) ? raw.positions : []
  368. const map = new Map<string, number>()
  369. for (const p of arr) {
  370. const sym = String(p?.symbol ?? '').toUpperCase()
  371. if (!sym) continue
  372. const amtStr = p?.amount ?? p?.positionAmt ?? p?.qty ?? '0'
  373. const amt = Number(amtStr)
  374. if (!Number.isFinite(amt)) continue
  375. map.set(sym, (map.get(sym) ?? 0) + amt)
  376. }
  377. return Array.from(map.entries()).map(([symbol, net]) => ({ symbol, netQty: String(net) }))
  378. }
  379. }
  380. export default AccountManager