/** * 统一对冲执行器 * 完全基于 ExchangeAdapter 接口,实现原子配对交易和智能回退逻辑 */ import { ExchangeAdapter, PlaceOrderReq, Order } from '../../exchanges/ExchangeAdapter' import { PrecisionValidator } from '../../utils/precision' import { AdapterErrorHandler } from '../../exchanges/AdapterErrorHandler' import type { AdapterError } from '../../exchanges/AdapterErrorHandler' import { HedgeRequest, ExecutionResult } from './types' import { hedgeCalculator } from './hedgeCalculator' import { EventEmitter } from 'events' export interface AdapterPair { primary: ExchangeAdapter // 主交易所(执行对冲) secondary?: ExchangeAdapter // 备用交易所 primaryId: string // 主交易所标识 secondaryId?: string // 备用交易所标识 } export interface ExecutionConfig { maxRetries: number // 最大重试次数 timeoutMs: number // 执行超时时间 atomicTimeout: number // 原子操作超时时间 enableRollback: boolean // 启用回滚机制 slippageTolerance: number // 滑点容忍度 positionSizeLimit: number // 单次操作仓位大小限制 } export interface HedgeExecution { id: string timestamp: number request: HedgeRequest primaryOrder?: Order secondaryOrder?: Order status: 'pending' | 'partial' | 'completed' | 'failed' | 'rolled_back' error?: AdapterError netExposure: number // 净敞口 executionLatency: number // 执行延迟(ms) } export class UnifiedHedgingExecutor extends EventEmitter { private executions = new Map() private netExposureBySymbol = new Map() constructor(private config: ExecutionConfig) { super() this.startNetExposureMonitor() } /** * 执行对冲交易 */ async execute(request: HedgeRequest, adapters: AdapterPair): Promise { const executionId = this.generateExecutionId() const startTime = Date.now() const execution: HedgeExecution = { id: executionId, timestamp: startTime, request, status: 'pending', netExposure: 0, executionLatency: 0, } this.executions.set(executionId, execution) this.emit('execution_started', execution) try { // 1. 计算对冲决策 const decision = hedgeCalculator.decide(request) if (!decision.shouldHedge) { execution.status = 'completed' execution.executionLatency = Date.now() - startTime this.emit('execution_completed', execution) return { success: true, executedQuantity: 0, executedPrice: 0 } } // 2. 验证适配器状态 await this.validateAdapters(adapters) // 3. 准备订单参数 const orderParams = await this.prepareOrders(request, decision, adapters) // 4. 执行原子配对交易 const result = await this.executeAtomicPair(orderParams, adapters, execution) execution.executionLatency = Date.now() - startTime this.emit('execution_completed', execution) return result } catch (error) { execution.error = error instanceof AdapterError ? error : AdapterErrorHandler.handleError('unified_executor', error, 'execute') execution.status = 'failed' execution.executionLatency = Date.now() - startTime this.emit('execution_failed', execution) // 尝试回滚 if (this.config.enableRollback) { await this.attemptRollback(execution, adapters) } throw execution.error } finally { // 更新净敞口 this.updateNetExposure(request.symbol, execution) } } /** * 准备订单参数 */ private async prepareOrders(request: HedgeRequest, decision: any, adapters: AdapterPair) { const side = decision.hedgeQuantity > 0 ? 'BUY' : 'SELL' const absQuantity = Math.abs(decision.hedgeQuantity) // 获取市场深度用于价格发现 const primaryDepth = await adapters.primary.depth(request.symbol, 20) const bestPrice = side === 'BUY' ? primaryDepth.asks[0]?.price : primaryDepth.bids[0]?.price if (!bestPrice) { throw new Error(`无法获取 ${request.symbol} 的有效价格`) } // 精度验证和调整 const adjustedParams = PrecisionValidator.adjustOrderParams( adapters.primaryId, request.symbol, bestPrice, absQuantity, ) if (!adjustedParams.valid) { throw new Error(`订单参数校验失败: ${adjustedParams.warnings.join(', ')}`) } // 检查仓位大小限制 const orderNotional = Number(adjustedParams.price) * Number(adjustedParams.quantity) if (orderNotional > this.config.positionSizeLimit) { throw new Error(`订单金额 ${orderNotional} 超过限制 ${this.config.positionSizeLimit}`) } return { symbol: request.symbol, side: side as 'BUY' | 'SELL', type: decision.method === 'spot' ? ('MARKET' as const) : ('LIMIT' as const), quantity: adjustedParams.quantity, price: adjustedParams.price, clientOrderId: `hedge_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, } } /** * 执行原子配对交易 */ private async executeAtomicPair( orderParams: PlaceOrderReq, adapters: AdapterPair, execution: HedgeExecution, ): Promise { const promises: Promise[] = [] // 主交易所下单 promises.push(adapters.primary.placeOrder(orderParams)) // 如果有备用交易所,同时下单 if (adapters.secondary) { promises.push(adapters.secondary.placeOrder(orderParams)) } try { // 等待所有订单完成或超时 const orders = await Promise.race([ Promise.all(promises), new Promise((_, reject) => setTimeout(() => reject(new Error('原子操作超时')), this.config.atomicTimeout), ), ]) execution.primaryOrder = orders[0] if (orders[1]) execution.secondaryOrder = orders[1] // 验证执行结果 const success = orders.every( order => order.status === 'NEW' || order.status === 'FILLED' || order.status === 'PARTIALLY_FILLED', ) if (!success) { throw new Error('部分订单失败') } execution.status = 'completed' return { success: true, orderId: execution.primaryOrder.id, executedQuantity: Number(execution.primaryOrder.origQty), executedPrice: Number(execution.primaryOrder.price || 0), } } catch (error) { // 如果部分订单成功,标记为部分成功并尝试回滚 execution.status = 'partial' throw error } } /** * 尝试回滚已执行的订单 */ private async attemptRollback(execution: HedgeExecution, adapters: AdapterPair): Promise { const rollbackPromises: Promise[] = [] try { if (execution.primaryOrder) { rollbackPromises.push(adapters.primary.cancelOrder(execution.request.symbol, execution.primaryOrder.id)) } if (execution.secondaryOrder && adapters.secondary) { rollbackPromises.push(adapters.secondary.cancelOrder(execution.request.symbol, execution.secondaryOrder.id)) } await Promise.allSettled(rollbackPromises) execution.status = 'rolled_back' this.emit('execution_rolled_back', execution) console.info(`🔄 对冲执行 ${execution.id} 已回滚`) } catch (rollbackError) { console.error(`❌ 回滚失败 ${execution.id}:`, rollbackError) this.emit('rollback_failed', { execution, error: rollbackError }) } } /** * 验证适配器状态 */ private async validateAdapters(adapters: AdapterPair): Promise { const validationPromises = [this.validateAdapter(adapters.primary, adapters.primaryId)] if (adapters.secondary) { validationPromises.push(this.validateAdapter(adapters.secondary, adapters.secondaryId!)) } await Promise.all(validationPromises) } /** * 验证单个适配器 */ private async validateAdapter(adapter: ExchangeAdapter, adapterId: string): Promise { try { const time = await adapter.time() if (!time || Math.abs(Date.now() - time) > 10000) { throw new Error(`${adapterId} 时间同步失败`) } } catch (error) { throw new Error(`${adapterId} 适配器验证失败: ${error.message}`) } } /** * 更新净敞口 */ private updateNetExposure(symbol: string, execution: HedgeExecution): void { const current = this.netExposureBySymbol.get(symbol) || 0 if (execution.status === 'completed' && execution.primaryOrder) { const orderQty = Number(execution.primaryOrder.origQty) const delta = execution.primaryOrder.side === 'BUY' ? orderQty : -orderQty execution.netExposure = delta this.netExposureBySymbol.set(symbol, current + delta) this.emit('net_exposure_updated', { symbol, previous: current, current: current + delta }) } } /** * 启动净敞口监控 */ private startNetExposureMonitor(): void { setInterval(() => { for (const [symbol, exposure] of this.netExposureBySymbol.entries()) { if (Math.abs(exposure) > 0.01) { // 净敞口阈值 this.emit('net_exposure_warning', { symbol, exposure }) } } }, 30000) // 每30秒检查一次 } /** * 强制净零操作 */ async forceNetZero(symbol: string, adapters: AdapterPair): Promise { const netExposure = this.netExposureBySymbol.get(symbol) || 0 if (Math.abs(netExposure) < 0.001) return // 已经足够接近零 console.info(`🎯 强制净零操作: ${symbol}, 当前敞口: ${netExposure}`) // 创建相反方向的订单来中和敞口 const side = netExposure > 0 ? 'SELL' : 'BUY' const quantity = Math.abs(netExposure) const orderParams: PlaceOrderReq = { symbol, side, type: 'MARKET', quantity: String(quantity), clientOrderId: `zero_${Date.now()}`, } try { await adapters.primary.placeOrder(orderParams) this.netExposureBySymbol.set(symbol, 0) this.emit('net_zero_completed', { symbol, previousExposure: netExposure }) } catch (error) { this.emit('net_zero_failed', { symbol, exposure: netExposure, error }) throw error } } /** * 获取执行统计 */ getExecutionStats(): { totalExecutions: number successRate: number averageLatency: number netExposures: Map } { const executions = Array.from(this.executions.values()) const completed = executions.filter(e => e.status === 'completed') return { totalExecutions: executions.length, successRate: completed.length / executions.length, averageLatency: completed.reduce((sum, e) => sum + e.executionLatency, 0) / completed.length, netExposures: new Map(this.netExposureBySymbol), } } /** * 清理过期执行记录 */ cleanup(maxAgeMs = 24 * 60 * 60 * 1000): void { const cutoff = Date.now() - maxAgeMs for (const [id, execution] of this.executions.entries()) { if (execution.timestamp < cutoff) { this.executions.delete(id) } } } private generateExecutionId(): string { return `hedge_${Date.now()}_${Math.random().toString(36).substr(2, 9)}` } }