| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- import { EventEmitter } from 'events'
- import { ExchangeAdapter, PlaceOrderReq, Order, Position, Balance, Depth } from '../exchanges/ExchangeAdapter'
- export interface AccountKey {
- exchange: string // 例如 'pacifica' | 'aster' | 'binance'
- accountId: string // 账户唯一标识(如公钥/地址/子账户名)
- }
- export interface RegisteredAccount extends AccountKey {
- adapter: ExchangeAdapter
- meta?: Record<string, any>
- }
- export type AggregatedBalances = Array<Balance & AccountKey>
- export type AggregatedPositions = Array<Position & AccountKey>
- export interface AccountLiveState extends AccountKey {
- // 来自 WS 的最新聚合态(若有)
- // 统一字段采用字符串,避免精度丢失;raw 保留原文
- accountEquity?: string // ae
- marginUsed?: string // mu
- posCount?: number // pc
- openOrderCount?: number // oc
- stopCount?: number // sc
- asAssets?: string // as(资产聚合,若含义变动则作为备注)
- alertWarning?: string // aw(预警值/比率)
- lastInfoTs?: number // t(ms)
- infoRaw?: any
- balancesRaw?: any
- positionsRaw?: any
- positionSummary?: Array<{ symbol: string; netQty: string }> // 归一化后的净持仓(按符号)
- }
- /**
- * 多平台多账户集中管理器
- * - 统一注册与路由:按 (exchange, accountId) 获取对应适配器
- * - 聚合查询:balances/positions across all accounts
- * - 精准路由:place/cancel/get/openOrders/leverage/depth 到指定账户
- * - 事件汇聚:把各适配器的 WS 事件转发,并附加 exchange/accountId 标签
- */
- export class AccountManager {
- private readonly accounts = new Map<string, RegisteredAccount>()
- private readonly events = new EventEmitter()
- private readonly wiredHandlers = new Map<string, Array<{ ev: string; fn: (p: any) => void }>>()
- private lastBalances?: { ts: number; data: AggregatedBalances }
- private lastPositions?: { ts: number; data: AggregatedPositions }
- private readonly liveStates = new Map<string, AccountLiveState>()
- private readonly pollers = new Map<string, NodeJS.Timeout>()
- /** 组合键 */
- private static keyOf(exchange: string, accountId: string): string {
- return `${exchange}::${accountId}`
- }
- /** 订阅并转发常见 WS 事件,payload 附加账户标签 */
- private wireEvents(reg: RegisteredAccount) {
- const src = reg.adapter.ws()
- const relay = (eventName: string) => (payload: any) => {
- this.events.emit(eventName, { ...payload, __am: { exchange: reg.exchange, accountId: reg.accountId } })
- this.events.emit('am:*', { event: eventName, payload, exchange: reg.exchange, accountId: reg.accountId })
- this.refreshLiveState(eventName, payload, reg)
- }
- const knownEvents = [
- 'depth',
- 'orders',
- 'trades',
- 'balance',
- 'account_info',
- 'account_positions',
- 'ws_error',
- 'ws_close',
- ]
- const handlers: Array<{ ev: string; fn: (p: any) => void }> = []
- for (const ev of knownEvents) {
- const fn = relay(ev)
- src.on(ev as any, fn)
- handlers.push({ ev, fn })
- }
- this.wiredHandlers.set(AccountManager.keyOf(reg.exchange, reg.accountId), handlers)
- }
- private refreshLiveState(eventName: string, payload: any, reg: RegisteredAccount) {
- // 仅处理账户类事件
- const k = AccountManager.keyOf(reg.exchange, reg.accountId)
- const prev = this.liveStates.get(k) || ({ exchange: reg.exchange, accountId: reg.accountId } as AccountLiveState)
- const ch = payload && typeof payload === 'object' ? payload.channel || payload.type : undefined
- if (eventName === 'account_info' || ch === 'account_info') {
- const d = payload && payload.data !== undefined ? payload.data : payload
- const next: AccountLiveState = {
- ...prev,
- accountEquity: d?.ae != null ? String(d.ae) : prev.accountEquity,
- marginUsed: d?.mu != null ? String(d.mu) : prev.marginUsed,
- posCount: d?.pc != null ? Number(d.pc) : prev.posCount,
- openOrderCount: d?.oc != null ? Number(d.oc) : prev.openOrderCount,
- stopCount: d?.sc != null ? Number(d.sc) : prev.stopCount,
- asAssets: d?.as != null ? String(d.as) : prev.asAssets,
- alertWarning: d?.aw != null ? String(d.aw) : prev.alertWarning,
- lastInfoTs: d?.t != null ? Number(d.t) : prev.lastInfoTs ?? Date.now(),
- infoRaw: d,
- }
- this.liveStates.set(k, next)
- this.events.emit('am:account_state', { ...next, __am: { exchange: reg.exchange, accountId: reg.accountId } })
- return
- }
- if (eventName === 'account_positions' || ch === 'account_positions') {
- const d = payload && payload.data !== undefined ? payload.data : payload
- const next: AccountLiveState = { ...prev, positionsRaw: d }
- // 计算净持仓摘要
- try {
- const sum = this.computePositionSummary(d)
- if (sum && sum.length) next.positionSummary = sum
- } catch {}
- this.liveStates.set(k, next)
- this.events.emit('am:account_state', { ...next, __am: { exchange: reg.exchange, accountId: reg.accountId } })
- return
- }
- if (eventName === 'balance' || eventName === 'account_balance' || ch === 'account_balance') {
- const d = payload && payload.data !== undefined ? payload.data : payload
- const next: AccountLiveState = { ...prev, balancesRaw: d }
- this.liveStates.set(k, next)
- this.events.emit('am:account_state', { ...next, __am: { exchange: reg.exchange, accountId: reg.accountId } })
- return
- }
- }
- /** 注册账户(幂等) */
- register(account: RegisteredAccount): void {
- const k = AccountManager.keyOf(account.exchange, account.accountId)
- if (this.accounts.has(k)) return
- this.accounts.set(k, account)
- this.wireEvents(account)
- // 默认开启账户信息轮询(无账户类 WS 的接入可准实时刷新;有 WS 的接入也作为兜底)
- this.startPollingBalancesPositions(5000, { exchange: account.exchange, accountId: account.accountId })
- }
- /** 移除账户 */
- unregister(exchange: string, accountId: string): void {
- const k = AccountManager.keyOf(exchange, accountId)
- const reg = this.accounts.get(k)
- if (reg) {
- const src = reg.adapter.ws()
- const handlers = this.wiredHandlers.get(k) || []
- for (const h of handlers) src.off(h.ev as any, h.fn as any)
- this.wiredHandlers.delete(k)
- }
- this.accounts.delete(k)
- this.liveStates.delete(k)
- const t = this.pollers.get(k)
- if (t) {
- clearInterval(t)
- this.pollers.delete(k)
- }
- }
- /** 列出所有账户键 */
- list(): AccountKey[] {
- return Array.from(this.accounts.values()).map(({ exchange, accountId }) => ({ exchange, accountId }))
- }
- /** 获取指定账户的适配器 */
- getAdapter(exchange: string, accountId: string): ExchangeAdapter | undefined {
- const k = AccountManager.keyOf(exchange, accountId)
- return this.accounts.get(k)?.adapter
- }
- /** 事件总线(带聚合标签) */
- ws(): EventEmitter {
- return this.events
- }
- // ========== 聚合查询 ==========
- async balancesAll(opts?: { cacheMs?: number }): Promise<AggregatedBalances> {
- if (opts?.cacheMs && this.lastBalances && Date.now() - this.lastBalances.ts <= opts.cacheMs) {
- return this.lastBalances.data
- }
- const tasks = Array.from(this.accounts.values()).map(async acc => {
- try {
- const list = await acc.adapter.balances()
- return list.map(b => ({ ...b, exchange: acc.exchange, accountId: acc.accountId }))
- } catch {
- return [] as AggregatedBalances
- }
- })
- const parts = await Promise.all(tasks)
- const data = parts.flat()
- this.lastBalances = { ts: Date.now(), data }
- return data
- }
- async positionsAll(opts?: { cacheMs?: number }): Promise<AggregatedPositions> {
- if (opts?.cacheMs && this.lastPositions && Date.now() - this.lastPositions.ts <= opts.cacheMs) {
- return this.lastPositions.data
- }
- const tasks = Array.from(this.accounts.values()).map(async acc => {
- try {
- const list = await acc.adapter.positions()
- return list.map(p => ({ ...p, exchange: acc.exchange, accountId: acc.accountId }))
- } catch {
- return [] as AggregatedPositions
- }
- })
- const parts = await Promise.all(tasks)
- const data = parts.flat()
- this.lastPositions = { ts: Date.now(), data }
- return data
- }
- // ========== 精准路由(单账户) ==========
- async placeOrderOn(exchange: string, accountId: string, req: PlaceOrderReq): Promise<Order> {
- const ad = this.getOrThrow(exchange, accountId)
- return await ad.placeOrder(req)
- }
- async cancelOrderOn(exchange: string, accountId: string, symbol: string, orderId: string): Promise<void> {
- const ad = this.getOrThrow(exchange, accountId)
- await ad.cancelOrder(symbol, orderId)
- }
- async cancelAllOn(exchange: string, accountId: string, symbol: string): Promise<void> {
- const ad = this.getOrThrow(exchange, accountId)
- await ad.cancelAll(symbol)
- }
- async openOrdersOn(exchange: string, accountId: string, symbol: string): Promise<Order[]> {
- const ad = this.getOrThrow(exchange, accountId)
- return await ad.openOrders(symbol)
- }
- async leverageOn(exchange: string, accountId: string, symbol: string, lev: number): Promise<void> {
- const ad = this.getOrThrow(exchange, accountId)
- if (!ad.leverage) throw new Error(`适配器 ${exchange} 不支持杠杆功能()`)
- await ad.leverage(symbol, lev)
- }
- async depthOn(exchange: string, accountId: string, symbol: string, limit = 50): Promise<Depth> {
- const ad = this.getOrThrow(exchange, accountId)
- return await ad.depth(symbol, limit)
- }
- // ========== 批量/广播工具 ==========
- async cancelAllByExchange(exchange: string, symbol: string): Promise<void> {
- const tasks: Array<Promise<void>> = []
- for (const acc of this.accounts.values()) {
- if (acc.exchange !== exchange) continue
- tasks.push(acc.adapter.cancelAll(symbol).catch(() => {}))
- }
- await Promise.all(tasks)
- }
- // 批量注册
- registerMany(accounts: RegisteredAccount[]): void {
- for (const a of accounts) this.register(a)
- }
- // 聚合 open orders(按符号)
- async openOrdersAll(symbol: string): Promise<Array<Order & AccountKey>> {
- const tasks = Array.from(this.accounts.values()).map(async acc => {
- try {
- const list = await acc.adapter.openOrders(symbol)
- return list.map(o => ({ ...o, exchange: acc.exchange, accountId: acc.accountId }) as any)
- } catch {
- return [] as Array<Order & AccountKey>
- }
- })
- const parts = await Promise.all(tasks)
- return parts.flat()
- }
- // 快照(balances + positions)
- async snapshot(opts?: {
- cacheMs?: number
- }): Promise<{ ts: number; balances: AggregatedBalances; positions: AggregatedPositions }> {
- const [balances, positions] = await Promise.all([this.balancesAll(opts), this.positionsAll(opts)])
- return { ts: Date.now(), balances, positions }
- }
- // 获取单账户或全部账户的实时聚合态(基于 WS 增量)
- getLiveState(exchange: string, accountId: string): AccountLiveState | undefined {
- return this.liveStates.get(AccountManager.keyOf(exchange, accountId))
- }
- getAllLiveStates(): AccountLiveState[] {
- return Array.from(this.liveStates.values())
- }
- // ========== 启动自检(最小实现) ==========
- async initCheck(opts?: { symbolHints?: Record<string, string>; soft?: boolean; timeSkewWarnMs?: number }) {
- const results: Array<{
- key: AccountKey
- ok: boolean
- steps: Array<{ name: string; ok: boolean; error?: string }>
- }> = []
- for (const acc of this.accounts.values()) {
- const steps: Array<{ name: string; ok: boolean; error?: string }> = []
- // 0) time skew
- try {
- const serverTs = await acc.adapter.time()
- const skew = Math.abs(Date.now() - serverTs)
- const limit = opts?.timeSkewWarnMs ?? 2000
- steps.push({
- name: `time(skew=${skew}ms)`,
- ok: skew <= limit,
- error: skew > limit ? `skew>${limit}` : undefined,
- })
- } catch (e: any) {
- steps.push({ name: 'time', ok: false, error: String(e?.message || e) })
- }
- // 1) symbols()
- try {
- const syms = await acc.adapter.symbols()
- steps.push({ name: 'symbols', ok: Array.isArray(syms) })
- } catch (e: any) {
- steps.push({ name: 'symbols', ok: false, error: String(e?.message || e) })
- }
- // 2) depth()
- try {
- const hint = opts?.symbolHints?.[acc.exchange] || (acc.exchange === 'aster' ? 'BTCUSDT' : 'BTC')
- const d = await acc.adapter.depth(hint, 5)
- steps.push({ name: `depth(${hint})`, ok: !!(d && d.bids && d.asks) })
- } catch (e: any) {
- steps.push({ name: 'depth', ok: false, error: String(e?.message || e) })
- }
- // 3) balances()/positions()(可能需要鉴权,失败不判死刑)
- try {
- await acc.adapter.balances()
- steps.push({ name: 'balances', ok: true })
- } catch (e: any) {
- steps.push({ name: 'balances', ok: false, error: String(e?.message || e) })
- }
- try {
- await acc.adapter.positions()
- steps.push({ name: 'positions', ok: true })
- } catch (e: any) {
- steps.push({ name: 'positions', ok: false, error: String(e?.message || e) })
- }
- const ok = steps.filter(s => s.name === 'symbols' || s.name.startsWith('depth')).every(s => s.ok)
- results.push({ key: { exchange: acc.exchange, accountId: acc.accountId }, ok, steps })
- if (!ok && !opts?.soft) {
- const msg = steps.map(s => `${s.name}:${s.ok ? 'ok' : 'fail'}`).join(', ')
- throw new Error(`初始化检查失败 ${acc.exchange}::${acc.accountId} -> ${msg}`)
- }
- }
- return results
- }
- // ========== 辅助 ==========
- private getOrThrow(exchange: string, accountId: string): ExchangeAdapter {
- const ad = this.getAdapter(exchange, accountId)
- if (!ad) throw new Error(`账户管理器: 未找到适配器 ${exchange}::${accountId}`)
- return ad
- }
- // ========== 轮询补齐(无账户 WS 的接入,如 Aster) ==========
- startPollingBalancesPositions(intervalMs = 5000, filter?: { exchange?: string; accountId?: string }) {
- for (const acc of this.accounts.values()) {
- if (filter?.exchange && acc.exchange !== filter.exchange) continue
- if (filter?.accountId && acc.accountId !== filter.accountId) continue
- const key = AccountManager.keyOf(acc.exchange, acc.accountId)
- if (this.pollers.has(key)) continue
- const timer = setInterval(
- async () => {
- try {
- const [bals, poss] = await Promise.all([
- acc.adapter.balances().catch(() => undefined),
- acc.adapter.positions().catch(() => undefined),
- ])
- const prev =
- this.liveStates.get(key) || ({ exchange: acc.exchange, accountId: acc.accountId } as AccountLiveState)
- const next: AccountLiveState = { ...prev }
- if (bals !== undefined) next.balancesRaw = bals
- if (poss !== undefined) {
- next.positionsRaw = poss
- try {
- const sum = this.computePositionSummary(poss)
- if (sum && sum.length) next.positionSummary = sum
- } catch {}
- }
- if (bals !== undefined || poss !== undefined) {
- this.liveStates.set(key, next)
- this.events.emit('am:account_state', {
- ...next,
- __am: { exchange: acc.exchange, accountId: acc.accountId },
- })
- }
- } catch {}
- },
- Math.max(1000, intervalMs),
- )
- this.pollers.set(key, timer)
- }
- }
- /**
- * 从 positions 原始数据推导净持仓(按符号)。
- * 兼容 Pacifica({ symbol, amount })与 Aster({ symbol, positionAmt })等形态。
- */
- private computePositionSummary(raw: any): Array<{ symbol: string; netQty: string }> {
- const arr: any[] = Array.isArray(raw) ? raw : raw && Array.isArray(raw.positions) ? raw.positions : []
- const map = new Map<string, number>()
- for (const p of arr) {
- const sym = String(p?.symbol ?? '').toUpperCase()
- if (!sym) continue
- const amtStr = p?.amount ?? p?.positionAmt ?? p?.qty ?? '0'
- const amt = Number(amtStr)
- if (!Number.isFinite(amt)) continue
- map.set(sym, (map.get(sym) ?? 0) + amt)
- }
- return Array.from(map.entries()).map(([symbol, net]) => ({ symbol, netQty: String(net) }))
- }
- }
- export default AccountManager
|