import { EventEmitter } from 'events' import { ExchangeAdapter, PlaceOrderReq, Order, Position, Balance, Depth } from '../exchanges/ExchangeAdapter' export interface AccountKey { exchange: string // 例如 'pacifica' | 'aster' | 'binance' accountId: string // 账户唯一标识(如公钥/地址/子账户名) } export interface RegisteredAccount extends AccountKey { adapter: ExchangeAdapter meta?: Record } export type AggregatedBalances = Array export type AggregatedPositions = Array export interface AccountLiveState extends AccountKey { // 来自 WS 的最新聚合态(若有) // 统一字段采用字符串,避免精度丢失;raw 保留原文 accountEquity?: string // ae marginUsed?: string // mu posCount?: number // pc openOrderCount?: number // oc stopCount?: number // sc asAssets?: string // as(资产聚合,若含义变动则作为备注) alertWarning?: string // aw(预警值/比率) lastInfoTs?: number // t(ms) infoRaw?: any balancesRaw?: any positionsRaw?: any positionSummary?: Array<{ symbol: string; netQty: string }> // 归一化后的净持仓(按符号) } /** * 多平台多账户集中管理器 * - 统一注册与路由:按 (exchange, accountId) 获取对应适配器 * - 聚合查询:balances/positions across all accounts * - 精准路由:place/cancel/get/openOrders/leverage/depth 到指定账户 * - 事件汇聚:把各适配器的 WS 事件转发,并附加 exchange/accountId 标签 */ export class AccountManager { private readonly accounts = new Map() private readonly events = new EventEmitter() private readonly wiredHandlers = new Map void }>>() private lastBalances?: { ts: number; data: AggregatedBalances } private lastPositions?: { ts: number; data: AggregatedPositions } private readonly liveStates = new Map() private readonly pollers = new Map() /** 组合键 */ private static keyOf(exchange: string, accountId: string): string { return `${exchange}::${accountId}` } /** 订阅并转发常见 WS 事件,payload 附加账户标签 */ private wireEvents(reg: RegisteredAccount) { const src = reg.adapter.ws() const relay = (eventName: string) => (payload: any) => { this.events.emit(eventName, { ...payload, __am: { exchange: reg.exchange, accountId: reg.accountId } }) this.events.emit('am:*', { event: eventName, payload, exchange: reg.exchange, accountId: reg.accountId }) this.refreshLiveState(eventName, payload, reg) } const knownEvents = [ 'depth', 'orders', 'trades', 'balance', 'account_info', 'account_positions', 'ws_error', 'ws_close', ] const handlers: Array<{ ev: string; fn: (p: any) => void }> = [] for (const ev of knownEvents) { const fn = relay(ev) src.on(ev as any, fn) handlers.push({ ev, fn }) } this.wiredHandlers.set(AccountManager.keyOf(reg.exchange, reg.accountId), handlers) } private refreshLiveState(eventName: string, payload: any, reg: RegisteredAccount) { // 仅处理账户类事件 const k = AccountManager.keyOf(reg.exchange, reg.accountId) const prev = this.liveStates.get(k) || ({ exchange: reg.exchange, accountId: reg.accountId } as AccountLiveState) const ch = payload && typeof payload === 'object' ? payload.channel || payload.type : undefined if (eventName === 'account_info' || ch === 'account_info') { const d = payload && payload.data !== undefined ? payload.data : payload const next: AccountLiveState = { ...prev, accountEquity: d?.ae != null ? String(d.ae) : prev.accountEquity, marginUsed: d?.mu != null ? String(d.mu) : prev.marginUsed, posCount: d?.pc != null ? Number(d.pc) : prev.posCount, openOrderCount: d?.oc != null ? Number(d.oc) : prev.openOrderCount, stopCount: d?.sc != null ? Number(d.sc) : prev.stopCount, asAssets: d?.as != null ? String(d.as) : prev.asAssets, alertWarning: d?.aw != null ? String(d.aw) : prev.alertWarning, lastInfoTs: d?.t != null ? Number(d.t) : prev.lastInfoTs ?? Date.now(), infoRaw: d, } this.liveStates.set(k, next) this.events.emit('am:account_state', { ...next, __am: { exchange: reg.exchange, accountId: reg.accountId } }) return } if (eventName === 'account_positions' || ch === 'account_positions') { const d = payload && payload.data !== undefined ? payload.data : payload const next: AccountLiveState = { ...prev, positionsRaw: d } // 计算净持仓摘要 try { const sum = this.computePositionSummary(d) if (sum && sum.length) next.positionSummary = sum } catch {} this.liveStates.set(k, next) this.events.emit('am:account_state', { ...next, __am: { exchange: reg.exchange, accountId: reg.accountId } }) return } if (eventName === 'balance' || eventName === 'account_balance' || ch === 'account_balance') { const d = payload && payload.data !== undefined ? payload.data : payload const next: AccountLiveState = { ...prev, balancesRaw: d } this.liveStates.set(k, next) this.events.emit('am:account_state', { ...next, __am: { exchange: reg.exchange, accountId: reg.accountId } }) return } } /** 注册账户(幂等) */ register(account: RegisteredAccount): void { const k = AccountManager.keyOf(account.exchange, account.accountId) if (this.accounts.has(k)) return this.accounts.set(k, account) this.wireEvents(account) // 默认开启账户信息轮询(无账户类 WS 的接入可准实时刷新;有 WS 的接入也作为兜底) this.startPollingBalancesPositions(5000, { exchange: account.exchange, accountId: account.accountId }) } /** 移除账户 */ unregister(exchange: string, accountId: string): void { const k = AccountManager.keyOf(exchange, accountId) const reg = this.accounts.get(k) if (reg) { const src = reg.adapter.ws() const handlers = this.wiredHandlers.get(k) || [] for (const h of handlers) src.off(h.ev as any, h.fn as any) this.wiredHandlers.delete(k) } this.accounts.delete(k) this.liveStates.delete(k) const t = this.pollers.get(k) if (t) { clearInterval(t) this.pollers.delete(k) } } /** 列出所有账户键 */ list(): AccountKey[] { return Array.from(this.accounts.values()).map(({ exchange, accountId }) => ({ exchange, accountId })) } /** 获取指定账户的适配器 */ getAdapter(exchange: string, accountId: string): ExchangeAdapter | undefined { const k = AccountManager.keyOf(exchange, accountId) return this.accounts.get(k)?.adapter } /** 事件总线(带聚合标签) */ ws(): EventEmitter { return this.events } // ========== 聚合查询 ========== async balancesAll(opts?: { cacheMs?: number }): Promise { if (opts?.cacheMs && this.lastBalances && Date.now() - this.lastBalances.ts <= opts.cacheMs) { return this.lastBalances.data } const tasks = Array.from(this.accounts.values()).map(async acc => { try { const list = await acc.adapter.balances() return list.map(b => ({ ...b, exchange: acc.exchange, accountId: acc.accountId })) } catch { return [] as AggregatedBalances } }) const parts = await Promise.all(tasks) const data = parts.flat() this.lastBalances = { ts: Date.now(), data } return data } async positionsAll(opts?: { cacheMs?: number }): Promise { if (opts?.cacheMs && this.lastPositions && Date.now() - this.lastPositions.ts <= opts.cacheMs) { return this.lastPositions.data } const tasks = Array.from(this.accounts.values()).map(async acc => { try { const list = await acc.adapter.positions() return list.map(p => ({ ...p, exchange: acc.exchange, accountId: acc.accountId })) } catch { return [] as AggregatedPositions } }) const parts = await Promise.all(tasks) const data = parts.flat() this.lastPositions = { ts: Date.now(), data } return data } // ========== 精准路由(单账户) ========== async placeOrderOn(exchange: string, accountId: string, req: PlaceOrderReq): Promise { const ad = this.getOrThrow(exchange, accountId) return await ad.placeOrder(req) } async cancelOrderOn(exchange: string, accountId: string, symbol: string, orderId: string): Promise { const ad = this.getOrThrow(exchange, accountId) await ad.cancelOrder(symbol, orderId) } async cancelAllOn(exchange: string, accountId: string, symbol: string): Promise { const ad = this.getOrThrow(exchange, accountId) await ad.cancelAll(symbol) } async openOrdersOn(exchange: string, accountId: string, symbol: string): Promise { const ad = this.getOrThrow(exchange, accountId) return await ad.openOrders(symbol) } async leverageOn(exchange: string, accountId: string, symbol: string, lev: number): Promise { const ad = this.getOrThrow(exchange, accountId) if (!ad.leverage) throw new Error(`适配器 ${exchange} 不支持杠杆功能()`) await ad.leverage(symbol, lev) } async depthOn(exchange: string, accountId: string, symbol: string, limit = 50): Promise { const ad = this.getOrThrow(exchange, accountId) return await ad.depth(symbol, limit) } // ========== 批量/广播工具 ========== async cancelAllByExchange(exchange: string, symbol: string): Promise { const tasks: Array> = [] for (const acc of this.accounts.values()) { if (acc.exchange !== exchange) continue tasks.push(acc.adapter.cancelAll(symbol).catch(() => {})) } await Promise.all(tasks) } // 批量注册 registerMany(accounts: RegisteredAccount[]): void { for (const a of accounts) this.register(a) } // 聚合 open orders(按符号) async openOrdersAll(symbol: string): Promise> { const tasks = Array.from(this.accounts.values()).map(async acc => { try { const list = await acc.adapter.openOrders(symbol) return list.map(o => ({ ...o, exchange: acc.exchange, accountId: acc.accountId }) as any) } catch { return [] as Array } }) const parts = await Promise.all(tasks) return parts.flat() } // 快照(balances + positions) async snapshot(opts?: { cacheMs?: number }): Promise<{ ts: number; balances: AggregatedBalances; positions: AggregatedPositions }> { const [balances, positions] = await Promise.all([this.balancesAll(opts), this.positionsAll(opts)]) return { ts: Date.now(), balances, positions } } // 获取单账户或全部账户的实时聚合态(基于 WS 增量) getLiveState(exchange: string, accountId: string): AccountLiveState | undefined { return this.liveStates.get(AccountManager.keyOf(exchange, accountId)) } getAllLiveStates(): AccountLiveState[] { return Array.from(this.liveStates.values()) } // ========== 启动自检(最小实现) ========== async initCheck(opts?: { symbolHints?: Record; soft?: boolean; timeSkewWarnMs?: number }) { const results: Array<{ key: AccountKey ok: boolean steps: Array<{ name: string; ok: boolean; error?: string }> }> = [] for (const acc of this.accounts.values()) { const steps: Array<{ name: string; ok: boolean; error?: string }> = [] // 0) time skew try { const serverTs = await acc.adapter.time() const skew = Math.abs(Date.now() - serverTs) const limit = opts?.timeSkewWarnMs ?? 2000 steps.push({ name: `time(skew=${skew}ms)`, ok: skew <= limit, error: skew > limit ? `skew>${limit}` : undefined, }) } catch (e: any) { steps.push({ name: 'time', ok: false, error: String(e?.message || e) }) } // 1) symbols() try { const syms = await acc.adapter.symbols() steps.push({ name: 'symbols', ok: Array.isArray(syms) }) } catch (e: any) { steps.push({ name: 'symbols', ok: false, error: String(e?.message || e) }) } // 2) depth() try { const hint = opts?.symbolHints?.[acc.exchange] || (acc.exchange === 'aster' ? 'BTCUSDT' : 'BTC') const d = await acc.adapter.depth(hint, 5) steps.push({ name: `depth(${hint})`, ok: !!(d && d.bids && d.asks) }) } catch (e: any) { steps.push({ name: 'depth', ok: false, error: String(e?.message || e) }) } // 3) balances()/positions()(可能需要鉴权,失败不判死刑) try { await acc.adapter.balances() steps.push({ name: 'balances', ok: true }) } catch (e: any) { steps.push({ name: 'balances', ok: false, error: String(e?.message || e) }) } try { await acc.adapter.positions() steps.push({ name: 'positions', ok: true }) } catch (e: any) { steps.push({ name: 'positions', ok: false, error: String(e?.message || e) }) } const ok = steps.filter(s => s.name === 'symbols' || s.name.startsWith('depth')).every(s => s.ok) results.push({ key: { exchange: acc.exchange, accountId: acc.accountId }, ok, steps }) if (!ok && !opts?.soft) { const msg = steps.map(s => `${s.name}:${s.ok ? 'ok' : 'fail'}`).join(', ') throw new Error(`初始化检查失败 ${acc.exchange}::${acc.accountId} -> ${msg}`) } } return results } // ========== 辅助 ========== private getOrThrow(exchange: string, accountId: string): ExchangeAdapter { const ad = this.getAdapter(exchange, accountId) if (!ad) throw new Error(`账户管理器: 未找到适配器 ${exchange}::${accountId}`) return ad } // ========== 轮询补齐(无账户 WS 的接入,如 Aster) ========== startPollingBalancesPositions(intervalMs = 5000, filter?: { exchange?: string; accountId?: string }) { for (const acc of this.accounts.values()) { if (filter?.exchange && acc.exchange !== filter.exchange) continue if (filter?.accountId && acc.accountId !== filter.accountId) continue const key = AccountManager.keyOf(acc.exchange, acc.accountId) if (this.pollers.has(key)) continue const timer = setInterval( async () => { try { const [bals, poss] = await Promise.all([ acc.adapter.balances().catch(() => undefined), acc.adapter.positions().catch(() => undefined), ]) const prev = this.liveStates.get(key) || ({ exchange: acc.exchange, accountId: acc.accountId } as AccountLiveState) const next: AccountLiveState = { ...prev } if (bals !== undefined) next.balancesRaw = bals if (poss !== undefined) { next.positionsRaw = poss try { const sum = this.computePositionSummary(poss) if (sum && sum.length) next.positionSummary = sum } catch {} } if (bals !== undefined || poss !== undefined) { this.liveStates.set(key, next) this.events.emit('am:account_state', { ...next, __am: { exchange: acc.exchange, accountId: acc.accountId }, }) } } catch {} }, Math.max(1000, intervalMs), ) this.pollers.set(key, timer) } } /** * 从 positions 原始数据推导净持仓(按符号)。 * 兼容 Pacifica({ symbol, amount })与 Aster({ symbol, positionAmt })等形态。 */ private computePositionSummary(raw: any): Array<{ symbol: string; netQty: string }> { const arr: any[] = Array.isArray(raw) ? raw : raw && Array.isArray(raw.positions) ? raw.positions : [] const map = new Map() for (const p of arr) { const sym = String(p?.symbol ?? '').toUpperCase() if (!sym) continue const amtStr = p?.amount ?? p?.positionAmt ?? p?.qty ?? '0' const amt = Number(amtStr) if (!Number.isFinite(amt)) continue map.set(sym, (map.get(sym) ?? 0) + amt) } return Array.from(map.entries()).map(([symbol, net]) => ({ symbol, netQty: String(net) })) } } export default AccountManager