UnifiedHedgingExecutor.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. /**
  2. * 统一对冲执行器
  3. * 完全基于 ExchangeAdapter 接口,实现原子配对交易和智能回退逻辑
  4. */
  5. import { ExchangeAdapter, PlaceOrderReq, Order } from '../../exchanges/ExchangeAdapter'
  6. import { PrecisionValidator } from '../../utils/precision'
  7. import { AdapterErrorHandler } from '../../exchanges/AdapterErrorHandler'
  8. import type { AdapterError } from '../../exchanges/AdapterErrorHandler'
  9. import { HedgeRequest, ExecutionResult } from './types'
  10. import { hedgeCalculator } from './hedgeCalculator'
  11. import { EventEmitter } from 'events'
  12. export interface AdapterPair {
  13. primary: ExchangeAdapter // 主交易所(执行对冲)
  14. secondary?: ExchangeAdapter // 备用交易所
  15. primaryId: string // 主交易所标识
  16. secondaryId?: string // 备用交易所标识
  17. }
  18. export interface ExecutionConfig {
  19. maxRetries: number // 最大重试次数
  20. timeoutMs: number // 执行超时时间
  21. atomicTimeout: number // 原子操作超时时间
  22. enableRollback: boolean // 启用回滚机制
  23. slippageTolerance: number // 滑点容忍度
  24. positionSizeLimit: number // 单次操作仓位大小限制
  25. }
  26. export interface HedgeExecution {
  27. id: string
  28. timestamp: number
  29. request: HedgeRequest
  30. primaryOrder?: Order
  31. secondaryOrder?: Order
  32. status: 'pending' | 'partial' | 'completed' | 'failed' | 'rolled_back'
  33. error?: AdapterError
  34. netExposure: number // 净敞口
  35. executionLatency: number // 执行延迟(ms)
  36. }
  37. export class UnifiedHedgingExecutor extends EventEmitter {
  38. private executions = new Map<string, HedgeExecution>()
  39. private netExposureBySymbol = new Map<string, number>()
  40. constructor(private config: ExecutionConfig) {
  41. super()
  42. this.startNetExposureMonitor()
  43. }
  44. /**
  45. * 执行对冲交易
  46. */
  47. async execute(request: HedgeRequest, adapters: AdapterPair): Promise<ExecutionResult> {
  48. const executionId = this.generateExecutionId()
  49. const startTime = Date.now()
  50. const execution: HedgeExecution = {
  51. id: executionId,
  52. timestamp: startTime,
  53. request,
  54. status: 'pending',
  55. netExposure: 0,
  56. executionLatency: 0,
  57. }
  58. this.executions.set(executionId, execution)
  59. this.emit('execution_started', execution)
  60. try {
  61. // 1. 计算对冲决策
  62. const decision = hedgeCalculator.decide(request)
  63. if (!decision.shouldHedge) {
  64. execution.status = 'completed'
  65. execution.executionLatency = Date.now() - startTime
  66. this.emit('execution_completed', execution)
  67. return { success: true, executedQuantity: 0, executedPrice: 0 }
  68. }
  69. // 2. 验证适配器状态
  70. await this.validateAdapters(adapters)
  71. // 3. 准备订单参数
  72. const orderParams = await this.prepareOrders(request, decision, adapters)
  73. // 4. 执行原子配对交易
  74. const result = await this.executeAtomicPair(orderParams, adapters, execution)
  75. execution.executionLatency = Date.now() - startTime
  76. this.emit('execution_completed', execution)
  77. return result
  78. } catch (error) {
  79. execution.error =
  80. error instanceof AdapterError ? error : AdapterErrorHandler.handleError('unified_executor', error, 'execute')
  81. execution.status = 'failed'
  82. execution.executionLatency = Date.now() - startTime
  83. this.emit('execution_failed', execution)
  84. // 尝试回滚
  85. if (this.config.enableRollback) {
  86. await this.attemptRollback(execution, adapters)
  87. }
  88. throw execution.error
  89. } finally {
  90. // 更新净敞口
  91. this.updateNetExposure(request.symbol, execution)
  92. }
  93. }
  94. /**
  95. * 准备订单参数
  96. */
  97. private async prepareOrders(request: HedgeRequest, decision: any, adapters: AdapterPair) {
  98. const side = decision.hedgeQuantity > 0 ? 'BUY' : 'SELL'
  99. const absQuantity = Math.abs(decision.hedgeQuantity)
  100. // 获取市场深度用于价格发现
  101. const primaryDepth = await adapters.primary.depth(request.symbol, 20)
  102. const bestPrice = side === 'BUY' ? primaryDepth.asks[0]?.price : primaryDepth.bids[0]?.price
  103. if (!bestPrice) {
  104. throw new Error(`无法获取 ${request.symbol} 的有效价格`)
  105. }
  106. // 精度验证和调整
  107. const adjustedParams = PrecisionValidator.adjustOrderParams(
  108. adapters.primaryId,
  109. request.symbol,
  110. bestPrice,
  111. absQuantity,
  112. )
  113. if (!adjustedParams.valid) {
  114. throw new Error(`订单参数校验失败: ${adjustedParams.warnings.join(', ')}`)
  115. }
  116. // 检查仓位大小限制
  117. const orderNotional = Number(adjustedParams.price) * Number(adjustedParams.quantity)
  118. if (orderNotional > this.config.positionSizeLimit) {
  119. throw new Error(`订单金额 ${orderNotional} 超过限制 ${this.config.positionSizeLimit}`)
  120. }
  121. return {
  122. symbol: request.symbol,
  123. side: side as 'BUY' | 'SELL',
  124. type: decision.method === 'spot' ? ('MARKET' as const) : ('LIMIT' as const),
  125. quantity: adjustedParams.quantity,
  126. price: adjustedParams.price,
  127. clientOrderId: `hedge_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
  128. }
  129. }
  130. /**
  131. * 执行原子配对交易
  132. */
  133. private async executeAtomicPair(
  134. orderParams: PlaceOrderReq,
  135. adapters: AdapterPair,
  136. execution: HedgeExecution,
  137. ): Promise<ExecutionResult> {
  138. const promises: Promise<Order>[] = []
  139. // 主交易所下单
  140. promises.push(adapters.primary.placeOrder(orderParams))
  141. // 如果有备用交易所,同时下单
  142. if (adapters.secondary) {
  143. promises.push(adapters.secondary.placeOrder(orderParams))
  144. }
  145. try {
  146. // 等待所有订单完成或超时
  147. const orders = await Promise.race([
  148. Promise.all(promises),
  149. new Promise<never>((_, reject) =>
  150. setTimeout(() => reject(new Error('原子操作超时')), this.config.atomicTimeout),
  151. ),
  152. ])
  153. execution.primaryOrder = orders[0]
  154. if (orders[1]) execution.secondaryOrder = orders[1]
  155. // 验证执行结果
  156. const success = orders.every(
  157. order => order.status === 'NEW' || order.status === 'FILLED' || order.status === 'PARTIALLY_FILLED',
  158. )
  159. if (!success) {
  160. throw new Error('部分订单失败')
  161. }
  162. execution.status = 'completed'
  163. return {
  164. success: true,
  165. orderId: execution.primaryOrder.id,
  166. executedQuantity: Number(execution.primaryOrder.origQty),
  167. executedPrice: Number(execution.primaryOrder.price || 0),
  168. }
  169. } catch (error) {
  170. // 如果部分订单成功,标记为部分成功并尝试回滚
  171. execution.status = 'partial'
  172. throw error
  173. }
  174. }
  175. /**
  176. * 尝试回滚已执行的订单
  177. */
  178. private async attemptRollback(execution: HedgeExecution, adapters: AdapterPair): Promise<void> {
  179. const rollbackPromises: Promise<void>[] = []
  180. try {
  181. if (execution.primaryOrder) {
  182. rollbackPromises.push(adapters.primary.cancelOrder(execution.request.symbol, execution.primaryOrder.id))
  183. }
  184. if (execution.secondaryOrder && adapters.secondary) {
  185. rollbackPromises.push(adapters.secondary.cancelOrder(execution.request.symbol, execution.secondaryOrder.id))
  186. }
  187. await Promise.allSettled(rollbackPromises)
  188. execution.status = 'rolled_back'
  189. this.emit('execution_rolled_back', execution)
  190. console.info(`🔄 对冲执行 ${execution.id} 已回滚`)
  191. } catch (rollbackError) {
  192. console.error(`❌ 回滚失败 ${execution.id}:`, rollbackError)
  193. this.emit('rollback_failed', { execution, error: rollbackError })
  194. }
  195. }
  196. /**
  197. * 验证适配器状态
  198. */
  199. private async validateAdapters(adapters: AdapterPair): Promise<void> {
  200. const validationPromises = [this.validateAdapter(adapters.primary, adapters.primaryId)]
  201. if (adapters.secondary) {
  202. validationPromises.push(this.validateAdapter(adapters.secondary, adapters.secondaryId!))
  203. }
  204. await Promise.all(validationPromises)
  205. }
  206. /**
  207. * 验证单个适配器
  208. */
  209. private async validateAdapter(adapter: ExchangeAdapter, adapterId: string): Promise<void> {
  210. try {
  211. const time = await adapter.time()
  212. if (!time || Math.abs(Date.now() - time) > 10000) {
  213. throw new Error(`${adapterId} 时间同步失败`)
  214. }
  215. } catch (error) {
  216. throw new Error(`${adapterId} 适配器验证失败: ${error.message}`)
  217. }
  218. }
  219. /**
  220. * 更新净敞口
  221. */
  222. private updateNetExposure(symbol: string, execution: HedgeExecution): void {
  223. const current = this.netExposureBySymbol.get(symbol) || 0
  224. if (execution.status === 'completed' && execution.primaryOrder) {
  225. const orderQty = Number(execution.primaryOrder.origQty)
  226. const delta = execution.primaryOrder.side === 'BUY' ? orderQty : -orderQty
  227. execution.netExposure = delta
  228. this.netExposureBySymbol.set(symbol, current + delta)
  229. this.emit('net_exposure_updated', { symbol, previous: current, current: current + delta })
  230. }
  231. }
  232. /**
  233. * 启动净敞口监控
  234. */
  235. private startNetExposureMonitor(): void {
  236. setInterval(() => {
  237. for (const [symbol, exposure] of this.netExposureBySymbol.entries()) {
  238. if (Math.abs(exposure) > 0.01) {
  239. // 净敞口阈值
  240. this.emit('net_exposure_warning', { symbol, exposure })
  241. }
  242. }
  243. }, 30000) // 每30秒检查一次
  244. }
  245. /**
  246. * 强制净零操作
  247. */
  248. async forceNetZero(symbol: string, adapters: AdapterPair): Promise<void> {
  249. const netExposure = this.netExposureBySymbol.get(symbol) || 0
  250. if (Math.abs(netExposure) < 0.001) return // 已经足够接近零
  251. console.info(`🎯 强制净零操作: ${symbol}, 当前敞口: ${netExposure}`)
  252. // 创建相反方向的订单来中和敞口
  253. const side = netExposure > 0 ? 'SELL' : 'BUY'
  254. const quantity = Math.abs(netExposure)
  255. const orderParams: PlaceOrderReq = {
  256. symbol,
  257. side,
  258. type: 'MARKET',
  259. quantity: String(quantity),
  260. clientOrderId: `zero_${Date.now()}`,
  261. }
  262. try {
  263. await adapters.primary.placeOrder(orderParams)
  264. this.netExposureBySymbol.set(symbol, 0)
  265. this.emit('net_zero_completed', { symbol, previousExposure: netExposure })
  266. } catch (error) {
  267. this.emit('net_zero_failed', { symbol, exposure: netExposure, error })
  268. throw error
  269. }
  270. }
  271. /**
  272. * 获取执行统计
  273. */
  274. getExecutionStats(): {
  275. totalExecutions: number
  276. successRate: number
  277. averageLatency: number
  278. netExposures: Map<string, number>
  279. } {
  280. const executions = Array.from(this.executions.values())
  281. const completed = executions.filter(e => e.status === 'completed')
  282. return {
  283. totalExecutions: executions.length,
  284. successRate: completed.length / executions.length,
  285. averageLatency: completed.reduce((sum, e) => sum + e.executionLatency, 0) / completed.length,
  286. netExposures: new Map(this.netExposureBySymbol),
  287. }
  288. }
  289. /**
  290. * 清理过期执行记录
  291. */
  292. cleanup(maxAgeMs = 24 * 60 * 60 * 1000): void {
  293. const cutoff = Date.now() - maxAgeMs
  294. for (const [id, execution] of this.executions.entries()) {
  295. if (execution.timestamp < cutoff) {
  296. this.executions.delete(id)
  297. }
  298. }
  299. }
  300. private generateExecutionId(): string {
  301. return `hedge_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
  302. }
  303. }