| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- /**
- * 统一对冲执行器
- * 完全基于 ExchangeAdapter 接口,实现原子配对交易和智能回退逻辑
- */
- import { ExchangeAdapter, PlaceOrderReq, Order } from '../../exchanges/ExchangeAdapter'
- import { PrecisionValidator } from '../../utils/precision'
- import { AdapterErrorHandler } from '../../exchanges/AdapterErrorHandler'
- import type { AdapterError } from '../../exchanges/AdapterErrorHandler'
- import { HedgeRequest, ExecutionResult } from './types'
- import { hedgeCalculator } from './hedgeCalculator'
- import { EventEmitter } from 'events'
- export interface AdapterPair {
- primary: ExchangeAdapter // 主交易所(执行对冲)
- secondary?: ExchangeAdapter // 备用交易所
- primaryId: string // 主交易所标识
- secondaryId?: string // 备用交易所标识
- }
- export interface ExecutionConfig {
- maxRetries: number // 最大重试次数
- timeoutMs: number // 执行超时时间
- atomicTimeout: number // 原子操作超时时间
- enableRollback: boolean // 启用回滚机制
- slippageTolerance: number // 滑点容忍度
- positionSizeLimit: number // 单次操作仓位大小限制
- }
- export interface HedgeExecution {
- id: string
- timestamp: number
- request: HedgeRequest
- primaryOrder?: Order
- secondaryOrder?: Order
- status: 'pending' | 'partial' | 'completed' | 'failed' | 'rolled_back'
- error?: AdapterError
- netExposure: number // 净敞口
- executionLatency: number // 执行延迟(ms)
- }
- export class UnifiedHedgingExecutor extends EventEmitter {
- private executions = new Map<string, HedgeExecution>()
- private netExposureBySymbol = new Map<string, number>()
- constructor(private config: ExecutionConfig) {
- super()
- this.startNetExposureMonitor()
- }
- /**
- * 执行对冲交易
- */
- async execute(request: HedgeRequest, adapters: AdapterPair): Promise<ExecutionResult> {
- const executionId = this.generateExecutionId()
- const startTime = Date.now()
- const execution: HedgeExecution = {
- id: executionId,
- timestamp: startTime,
- request,
- status: 'pending',
- netExposure: 0,
- executionLatency: 0,
- }
- this.executions.set(executionId, execution)
- this.emit('execution_started', execution)
- try {
- // 1. 计算对冲决策
- const decision = hedgeCalculator.decide(request)
- if (!decision.shouldHedge) {
- execution.status = 'completed'
- execution.executionLatency = Date.now() - startTime
- this.emit('execution_completed', execution)
- return { success: true, executedQuantity: 0, executedPrice: 0 }
- }
- // 2. 验证适配器状态
- await this.validateAdapters(adapters)
- // 3. 准备订单参数
- const orderParams = await this.prepareOrders(request, decision, adapters)
- // 4. 执行原子配对交易
- const result = await this.executeAtomicPair(orderParams, adapters, execution)
- execution.executionLatency = Date.now() - startTime
- this.emit('execution_completed', execution)
- return result
- } catch (error) {
- execution.error =
- error instanceof AdapterError ? error : AdapterErrorHandler.handleError('unified_executor', error, 'execute')
- execution.status = 'failed'
- execution.executionLatency = Date.now() - startTime
- this.emit('execution_failed', execution)
- // 尝试回滚
- if (this.config.enableRollback) {
- await this.attemptRollback(execution, adapters)
- }
- throw execution.error
- } finally {
- // 更新净敞口
- this.updateNetExposure(request.symbol, execution)
- }
- }
- /**
- * 准备订单参数
- */
- private async prepareOrders(request: HedgeRequest, decision: any, adapters: AdapterPair) {
- const side = decision.hedgeQuantity > 0 ? 'BUY' : 'SELL'
- const absQuantity = Math.abs(decision.hedgeQuantity)
- // 获取市场深度用于价格发现
- const primaryDepth = await adapters.primary.depth(request.symbol, 20)
- const bestPrice = side === 'BUY' ? primaryDepth.asks[0]?.price : primaryDepth.bids[0]?.price
- if (!bestPrice) {
- throw new Error(`无法获取 ${request.symbol} 的有效价格`)
- }
- // 精度验证和调整
- const adjustedParams = PrecisionValidator.adjustOrderParams(
- adapters.primaryId,
- request.symbol,
- bestPrice,
- absQuantity,
- )
- if (!adjustedParams.valid) {
- throw new Error(`订单参数校验失败: ${adjustedParams.warnings.join(', ')}`)
- }
- // 检查仓位大小限制
- const orderNotional = Number(adjustedParams.price) * Number(adjustedParams.quantity)
- if (orderNotional > this.config.positionSizeLimit) {
- throw new Error(`订单金额 ${orderNotional} 超过限制 ${this.config.positionSizeLimit}`)
- }
- return {
- symbol: request.symbol,
- side: side as 'BUY' | 'SELL',
- type: decision.method === 'spot' ? ('MARKET' as const) : ('LIMIT' as const),
- quantity: adjustedParams.quantity,
- price: adjustedParams.price,
- clientOrderId: `hedge_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
- }
- }
- /**
- * 执行原子配对交易
- */
- private async executeAtomicPair(
- orderParams: PlaceOrderReq,
- adapters: AdapterPair,
- execution: HedgeExecution,
- ): Promise<ExecutionResult> {
- const promises: Promise<Order>[] = []
- // 主交易所下单
- promises.push(adapters.primary.placeOrder(orderParams))
- // 如果有备用交易所,同时下单
- if (adapters.secondary) {
- promises.push(adapters.secondary.placeOrder(orderParams))
- }
- try {
- // 等待所有订单完成或超时
- const orders = await Promise.race([
- Promise.all(promises),
- new Promise<never>((_, reject) =>
- setTimeout(() => reject(new Error('原子操作超时')), this.config.atomicTimeout),
- ),
- ])
- execution.primaryOrder = orders[0]
- if (orders[1]) execution.secondaryOrder = orders[1]
- // 验证执行结果
- const success = orders.every(
- order => order.status === 'NEW' || order.status === 'FILLED' || order.status === 'PARTIALLY_FILLED',
- )
- if (!success) {
- throw new Error('部分订单失败')
- }
- execution.status = 'completed'
- return {
- success: true,
- orderId: execution.primaryOrder.id,
- executedQuantity: Number(execution.primaryOrder.origQty),
- executedPrice: Number(execution.primaryOrder.price || 0),
- }
- } catch (error) {
- // 如果部分订单成功,标记为部分成功并尝试回滚
- execution.status = 'partial'
- throw error
- }
- }
- /**
- * 尝试回滚已执行的订单
- */
- private async attemptRollback(execution: HedgeExecution, adapters: AdapterPair): Promise<void> {
- const rollbackPromises: Promise<void>[] = []
- try {
- if (execution.primaryOrder) {
- rollbackPromises.push(adapters.primary.cancelOrder(execution.request.symbol, execution.primaryOrder.id))
- }
- if (execution.secondaryOrder && adapters.secondary) {
- rollbackPromises.push(adapters.secondary.cancelOrder(execution.request.symbol, execution.secondaryOrder.id))
- }
- await Promise.allSettled(rollbackPromises)
- execution.status = 'rolled_back'
- this.emit('execution_rolled_back', execution)
- console.info(`🔄 对冲执行 ${execution.id} 已回滚`)
- } catch (rollbackError) {
- console.error(`❌ 回滚失败 ${execution.id}:`, rollbackError)
- this.emit('rollback_failed', { execution, error: rollbackError })
- }
- }
- /**
- * 验证适配器状态
- */
- private async validateAdapters(adapters: AdapterPair): Promise<void> {
- const validationPromises = [this.validateAdapter(adapters.primary, adapters.primaryId)]
- if (adapters.secondary) {
- validationPromises.push(this.validateAdapter(adapters.secondary, adapters.secondaryId!))
- }
- await Promise.all(validationPromises)
- }
- /**
- * 验证单个适配器
- */
- private async validateAdapter(adapter: ExchangeAdapter, adapterId: string): Promise<void> {
- try {
- const time = await adapter.time()
- if (!time || Math.abs(Date.now() - time) > 10000) {
- throw new Error(`${adapterId} 时间同步失败`)
- }
- } catch (error) {
- throw new Error(`${adapterId} 适配器验证失败: ${error.message}`)
- }
- }
- /**
- * 更新净敞口
- */
- private updateNetExposure(symbol: string, execution: HedgeExecution): void {
- const current = this.netExposureBySymbol.get(symbol) || 0
- if (execution.status === 'completed' && execution.primaryOrder) {
- const orderQty = Number(execution.primaryOrder.origQty)
- const delta = execution.primaryOrder.side === 'BUY' ? orderQty : -orderQty
- execution.netExposure = delta
- this.netExposureBySymbol.set(symbol, current + delta)
- this.emit('net_exposure_updated', { symbol, previous: current, current: current + delta })
- }
- }
- /**
- * 启动净敞口监控
- */
- private startNetExposureMonitor(): void {
- setInterval(() => {
- for (const [symbol, exposure] of this.netExposureBySymbol.entries()) {
- if (Math.abs(exposure) > 0.01) {
- // 净敞口阈值
- this.emit('net_exposure_warning', { symbol, exposure })
- }
- }
- }, 30000) // 每30秒检查一次
- }
- /**
- * 强制净零操作
- */
- async forceNetZero(symbol: string, adapters: AdapterPair): Promise<void> {
- const netExposure = this.netExposureBySymbol.get(symbol) || 0
- if (Math.abs(netExposure) < 0.001) return // 已经足够接近零
- console.info(`🎯 强制净零操作: ${symbol}, 当前敞口: ${netExposure}`)
- // 创建相反方向的订单来中和敞口
- const side = netExposure > 0 ? 'SELL' : 'BUY'
- const quantity = Math.abs(netExposure)
- const orderParams: PlaceOrderReq = {
- symbol,
- side,
- type: 'MARKET',
- quantity: String(quantity),
- clientOrderId: `zero_${Date.now()}`,
- }
- try {
- await adapters.primary.placeOrder(orderParams)
- this.netExposureBySymbol.set(symbol, 0)
- this.emit('net_zero_completed', { symbol, previousExposure: netExposure })
- } catch (error) {
- this.emit('net_zero_failed', { symbol, exposure: netExposure, error })
- throw error
- }
- }
- /**
- * 获取执行统计
- */
- getExecutionStats(): {
- totalExecutions: number
- successRate: number
- averageLatency: number
- netExposures: Map<string, number>
- } {
- const executions = Array.from(this.executions.values())
- const completed = executions.filter(e => e.status === 'completed')
- return {
- totalExecutions: executions.length,
- successRate: completed.length / executions.length,
- averageLatency: completed.reduce((sum, e) => sum + e.executionLatency, 0) / completed.length,
- netExposures: new Map(this.netExposureBySymbol),
- }
- }
- /**
- * 清理过期执行记录
- */
- cleanup(maxAgeMs = 24 * 60 * 60 * 1000): void {
- const cutoff = Date.now() - maxAgeMs
- for (const [id, execution] of this.executions.entries()) {
- if (execution.timestamp < cutoff) {
- this.executions.delete(id)
- }
- }
- }
- private generateExecutionId(): string {
- return `hedge_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
- }
- }
|