import { EventEmitter } from 'events' import { logger } from '../../utils/logger.js' import { BasisManager, basisManager } from './basisManager.js' import { PriceConvergenceManager, priceConvergenceManager } from './priceConvergenceManager.js' import { StopLossManager, stopLossManager } from './stopLossManager.js' import { ConvergenceAlgorithm, convergenceAlgorithm } from './convergenceAlgorithm.js' import { SamePlatformHedgingManager } from './SamePlatformHedgingManager.js' import { PacificaProxyClient } from '../../exchanges/pacifica/PacificaProxyClient.js' /** * 增强型对冲执行器 - 集成基差管理、价格收敛、止盈止损等功能 */ export class EnhancedHedgingExecutor extends EventEmitter { private basisManager: BasisManager private convergenceManager: PriceConvergenceManager private stopLossManager: StopLossManager private convergenceAlgorithm: ConvergenceAlgorithm private platformManager: SamePlatformHedgingManager private clients: Map = new Map() private config: EnhancedHedgingConfig private isActive: boolean = false private executionInterval?: NodeJS.Timeout private performanceMetrics: PerformanceMetrics = { totalTrades: 0, successfulTrades: 0, failedTrades: 0, totalPnl: 0, totalFees: 0, averageConvergenceTime: 0, basisRiskEvents: 0, stopLossTriggered: 0, takeProfitTriggered: 0, maxDrawdown: 0, sharpeRatio: 0, startTime: Date.now(), } constructor(config?: Partial) { super() this.config = { executionIntervalMs: config?.executionIntervalMs || 5000, // 5秒执行间隔 enableBasisManagement: config?.enableBasisManagement ?? true, enableConvergenceManagement: config?.enableConvergenceManagement ?? true, enableStopLossManagement: config?.enableStopLossManagement ?? true, enablePerformanceTracking: config?.enablePerformanceTracking ?? true, maxDailyTrades: config?.maxDailyTrades || 100, maxConcurrentPairs: config?.maxConcurrentPairs || 10, emergencyStopThreshold: config?.emergencyStopThreshold || -0.05, // -5% 紧急停止 riskLimits: { maxPositionSize: config?.riskLimits?.maxPositionSize || 1.0, maxTotalExposure: config?.riskLimits?.maxTotalExposure || 5.0, maxDailyLoss: config?.riskLimits?.maxDailyLoss || 1000, maxBasisDeviation: config?.riskLimits?.maxBasisDeviation || 200, ...config?.riskLimits, }, notifications: { enableSlack: config?.notifications?.enableSlack || false, enableEmail: config?.notifications?.enableEmail || false, enableWebhook: config?.notifications?.enableWebhook || false, ...config?.notifications, }, } // 初始化各个管理器 this.basisManager = config?.basisManager || basisManager this.convergenceManager = config?.convergenceManager || priceConvergenceManager this.stopLossManager = config?.stopLossManager || stopLossManager this.convergenceAlgorithm = config?.convergenceAlgorithm || convergenceAlgorithm this.platformManager = new SamePlatformHedgingManager('pacifica', this.config.riskLimits) this.setupEventListeners() } /** * 设置事件监听器 */ private setupEventListeners(): void { // 基差管理事件 this.basisManager.on('basisAlert', alert => { this.handleBasisAlert(alert) }) // 收敛管理事件 this.convergenceManager.on('convergenceTradeExecuted', event => { this.handleConvergenceTradeExecuted(event) }) // 止损管理事件 this.stopLossManager.on('stopTriggered', event => { this.handleStopTriggered(event) }) // 平台管理事件 this.platformManager.on('rebalanceCompleted', event => { this.handleRebalanceCompleted(event) }) } /** * 添加交易对管理 */ async addTradingPair( pairId: string, account1Config: AccountConfig, account2Config: AccountConfig, symbol: string, options?: PairOptions, ): Promise { try { // 创建交易客户端 const client1 = new PacificaProxyClient({ account: account1Config.account, privateKey: account1Config.privateKey, agentWallet: account1Config.agentWallet, agentPrivateKey: account1Config.agentPrivateKey, }) const client2 = new PacificaProxyClient({ account: account2Config.account, privateKey: account2Config.privateKey, agentWallet: account2Config.agentWallet, agentPrivateKey: account2Config.agentPrivateKey, }) this.clients.set(account1Config.account, client1) this.clients.set(account2Config.account, client2) // 添加到各个管理器 if (this.config.enableConvergenceManagement) { this.convergenceManager.addAccountPair(pairId, account1Config, account2Config, symbol) } if (this.config.enableStopLossManagement) { this.stopLossManager.registerClient(account1Config.account, client1) this.stopLossManager.registerClient(account2Config.account, client2) } // 添加到平台管理器 this.platformManager.addAccount(account1Config.account, account1Config) this.platformManager.addAccount(account2Config.account, account2Config) this.platformManager.createHedgePair(pairId, account1Config.account, account2Config.account, symbol) logger.info('添加交易对成功', { pairId, account1: account1Config.account, account2: account2Config.account, symbol, options, }) this.emit('tradingPairAdded', { pairId, account1: account1Config.account, account2: account2Config.account, symbol, }) } catch (error: any) { logger.error('添加交易对失败', { pairId, error: error.message, }) throw error } } /** * 开始增强对冲执行 */ async startEnhancedHedging(): Promise { if (this.isActive) return this.isActive = true // 启动各个管理器 if (this.config.enableBasisManagement) { this.basisManager.startMonitoring() } if (this.config.enableConvergenceManagement) { this.convergenceManager.startConvergenceMonitoring() } if (this.config.enableStopLossManagement) { this.stopLossManager.startMonitoring() } // 启动主执行循环 this.executionInterval = setInterval(() => { this.performEnhancedExecution() }, this.config.executionIntervalMs) logger.info('增强对冲执行已启动', { enableBasisManagement: this.config.enableBasisManagement, enableConvergenceManagement: this.config.enableConvergenceManagement, enableStopLossManagement: this.config.enableStopLossManagement, executionInterval: this.config.executionIntervalMs, }) this.emit('enhancedHedgingStarted') } /** * 停止增强对冲执行 */ async stopEnhancedHedging(): Promise { if (!this.isActive) return this.isActive = false // 停止各个管理器 this.basisManager.stopMonitoring() this.convergenceManager.stopConvergenceMonitoring() this.stopLossManager.stopMonitoring() // 停止主执行循环 if (this.executionInterval) { clearInterval(this.executionInterval) this.executionInterval = undefined } logger.info('增强对冲执行已停止') this.emit('enhancedHedgingStopped') } /** * 执行增强对冲逻辑 */ private async performEnhancedExecution(): Promise { try { // 检查紧急停止条件 if (await this.checkEmergencyStop()) { await this.triggerEmergencyStop() return } // 更新基差数据 if (this.config.enableBasisManagement) { await this.updateBasisData() } // 执行收敛检查和调整 if (this.config.enableConvergenceManagement) { await this.performConvergenceExecution() } // 更新性能指标 if (this.config.enablePerformanceTracking) { await this.updatePerformanceMetrics() } } catch (error: any) { logger.error('增强对冲执行失败', { error: error.message, }) } } /** * 检查紧急停止条件 */ private async checkEmergencyStop(): Promise { try { const dailyPnl = await this.calculateDailyPnl() const totalDrawdown = await this.calculateCurrentDrawdown() // 检查日亏损限制 if (dailyPnl < -this.config.riskLimits.maxDailyLoss) { logger.warn('触发日亏损限制', { dailyPnl, maxDailyLoss: this.config.riskLimits.maxDailyLoss, }) return true } // 检查回撤限制 if (totalDrawdown < this.config.emergencyStopThreshold) { logger.warn('触发紧急停止阈值', { totalDrawdown, emergencyStopThreshold: this.config.emergencyStopThreshold, }) return true } return false } catch (error: any) { logger.error('检查紧急停止条件失败', { error: error.message, }) return false } } /** * 触发紧急停止 */ private async triggerEmergencyStop(): Promise { logger.warn('触发紧急停止,开始平仓所有仓位') try { // 停止所有自动化执行 await this.stopEnhancedHedging() // 平仓所有仓位 for (const [account, client] of this.clients) { try { const positions = await client.getPositions() for (const position of positions) { if (Math.abs(parseFloat(position.size || '0')) > 0.001) { await this.emergencyClosePosition(account, position) } } } catch (error: any) { logger.error('紧急平仓失败', { account, error: error.message, }) } } this.emit('emergencyStopTriggered', { reason: 'risk_limit_exceeded', timestamp: Date.now(), }) // 发送通知 await this.sendEmergencyNotification() } catch (error: any) { logger.error('紧急停止执行失败', { error: error.message, }) } } /** * 紧急平仓 */ private async emergencyClosePosition(account: string, position: any): Promise { const client = this.clients.get(account) if (!client) return try { const size = Math.abs(parseFloat(position.size || '0')) const isLong = parseFloat(position.size || '0') > 0 const side = isLong ? 'ask' : 'bid' const payload = { account, symbol: position.symbol, amount: size.toString(), side, reduceOnly: true, slippagePercent: '2.0', // 紧急情况允许更大滑点 } const result = await client.createMarketOrder(payload) logger.info('紧急平仓执行成功', { account, symbol: position.symbol, size, side, orderId: result.orderId, }) } catch (error: any) { logger.error('紧急平仓执行失败', { account, position: position.symbol, error: error.message, }) } } /** * 更新基差数据 */ private async updateBasisData(): Promise { try { // 获取所有交易对的现货和期货价格 const symbols = this.getMonitoredSymbols() for (const symbol of symbols) { const marketData = await this.getMarketData(symbol) if (marketData) { // 假设基差为期货价格 - 现货价格(这里简化处理) const spotPrice = marketData.price const futuresPrice = marketData.price * (1 + Math.random() * 0.002 - 0.001) // 模拟期货价格 this.basisManager.updateBasis(symbol, spotPrice, futuresPrice) } } } catch (error: any) { logger.error('更新基差数据失败', { error: error.message, }) } } /** * 执行收敛逻辑 */ private async performConvergenceExecution(): Promise { try { const pairStatuses = this.convergenceManager.getAllAccountPairStatuses() for (const pairStatus of pairStatuses) { if (!pairStatus.account1Position || !pairStatus.account2Position) continue const marketData = await this.getMarketData(pairStatus.symbol) if (!marketData) continue // 使用收敛算法计算策略 const strategy = this.convergenceAlgorithm.calculateConvergenceStrategy( this.convertToPositionInfo(pairStatus.account1Position, marketData), this.convertToPositionInfo(pairStatus.account2Position, marketData), marketData, ) if (strategy.needsAdjustment && strategy.urgency === 'high') { await this.executeConvergenceStrategy(pairStatus.pairId, strategy) } } } catch (error: any) { logger.error('执行收敛逻辑失败', { error: error.message, }) } } /** * 执行收敛策略 */ private async executeConvergenceStrategy(pairId: string, strategy: any): Promise { try { logger.info('执行收敛策略', { pairId, strategy: strategy.strategy, confidence: strategy.confidence, reasoning: strategy.reasoning, }) // 这里可以调用具体的交易执行逻辑 // 暂时使用平台管理器的再平衡功能 await this.platformManager.rebalanceHedgePair(pairId, 0.01) this.performanceMetrics.totalTrades++ this.emit('convergenceStrategyExecuted', { pairId, strategy, timestamp: Date.now(), }) } catch (error: any) { logger.error('执行收敛策略失败', { pairId, error: error.message, }) this.performanceMetrics.failedTrades++ } } /** * 处理基差预警 */ private async handleBasisAlert(alert: any): Promise { logger.warn('基差预警', alert) this.performanceMetrics.basisRiskEvents++ // 发送通知 await this.sendNotification('basis_alert', { symbol: alert.symbol, riskLevel: alert.riskLevel, message: alert.message, }) this.emit('basisRiskDetected', alert) } /** * 处理收敛交易执行 */ private async handleConvergenceTradeExecuted(event: any): Promise { logger.info('收敛交易执行', event) this.performanceMetrics.successfulTrades++ // 为新仓位设置止盈止损 if (this.config.enableStopLossManagement) { await this.setStopLossForNewTrades(event) } } /** * 处理止损触发 */ private async handleStopTriggered(event: any): Promise { logger.info('止损触发', event) if (event.execution.triggerType === 'stop_loss') { this.performanceMetrics.stopLossTriggered++ } else if (event.execution.triggerType === 'take_profit') { this.performanceMetrics.takeProfitTriggered++ } this.emit('stopLossTriggered', event) } /** * 处理再平衡完成 */ private async handleRebalanceCompleted(event: any): Promise { logger.info('再平衡完成', event) this.emit('rebalanceCompleted', event) } /** * 为新交易设置止盈止损 */ private async setStopLossForNewTrades(event: any): Promise { try { for (const result of event.results) { if (result.success && result.executedSize > 0) { await this.stopLossManager.setStopLossAndTakeProfit( result.account, event.tradeParams.symbol || 'BTC-USD', result.executedSize, result.executedPrice, result.side === 'bid', // 买入为多头 ) } } } catch (error: any) { logger.error('设置止盈止损失败', { error: error.message, }) } } /** * 获取市场数据 */ private async getMarketData(symbol: string): Promise { try { // 使用第一个可用的客户端获取市场数据 const client = this.clients.values().next().value if (!client) return null const ticker = await client.getTicker(symbol) return { symbol, price: ticker?.price || 0, volume: ticker?.volume || 0, volatility: 0.02, // 模拟波动率 trend: 'sideways', momentum: 0, } } catch (error: any) { logger.error('获取市场数据失败', { symbol, error: error.message, }) return null } } /** * 转换为算法需要的仓位信息格式 */ private convertToPositionInfo(position: any, marketData: any): any { return { symbol: position.symbol, size: position.size, averagePrice: position.averagePrice || position.entryPrice, currentPrice: marketData.price, unrealizedPnl: position.unrealizedPnl, minOrderSize: 0.001, } } /** * 获取监控的交易对列表 */ private getMonitoredSymbols(): string[] { const pairStatuses = this.convergenceManager.getAllAccountPairStatuses() return pairStatuses.map(status => status.symbol) } /** * 计算日盈亏 */ private async calculateDailyPnl(): Promise { let totalPnl = 0 try { for (const [account, client] of this.clients) { const positions = await client.getPositions() for (const position of positions) { totalPnl += parseFloat(position.unrealizedPnl || '0') } } } catch (error: any) { logger.error('计算日盈亏失败', { error: error.message, }) } return totalPnl } /** * 计算当前回撤 */ private async calculateCurrentDrawdown(): Promise { // 简化实现,实际应该基于历史净值计算 const currentPnl = await this.calculateDailyPnl() return Math.min(0, currentPnl / 10000) // 假设初始资金10000 } /** * 更新性能指标 */ private async updatePerformanceMetrics(): Promise { try { const currentPnl = await this.calculateDailyPnl() this.performanceMetrics.totalPnl = currentPnl const drawdown = await this.calculateCurrentDrawdown() this.performanceMetrics.maxDrawdown = Math.min(this.performanceMetrics.maxDrawdown, drawdown) // 计算成功率 if (this.performanceMetrics.totalTrades > 0) { const successRate = this.performanceMetrics.successfulTrades / this.performanceMetrics.totalTrades logger.debug('性能指标更新', { totalTrades: this.performanceMetrics.totalTrades, successRate: successRate.toFixed(3), totalPnl: this.performanceMetrics.totalPnl.toFixed(2), maxDrawdown: this.performanceMetrics.maxDrawdown.toFixed(3), }) } } catch (error: any) { logger.error('更新性能指标失败', { error: error.message, }) } } /** * 发送通知 */ private async sendNotification(type: string, data: any): Promise { try { if (this.config.notifications.enableWebhook) { // 发送 Webhook 通知 logger.info('发送通知', { type, data }) } this.emit('notification', { type, data, timestamp: Date.now() }) } catch (error: any) { logger.error('发送通知失败', { error: error.message, }) } } /** * 发送紧急通知 */ private async sendEmergencyNotification(): Promise { await this.sendNotification('emergency_stop', { message: '系统触发紧急停止', performanceMetrics: this.performanceMetrics, }) } /** * 获取性能指标 */ getPerformanceMetrics(): PerformanceMetrics { return { ...this.performanceMetrics } } /** * 获取运行状态 */ getStatus(): HedgingExecutorStatus { return { isActive: this.isActive, activePairs: this.convergenceManager.getAllAccountPairStatuses().length, activeStopOrders: this.stopLossManager.getActiveStopOrders().length, performanceMetrics: this.performanceMetrics, config: this.config, } } } // 类型定义 export interface AccountConfig { account: string privateKey: string agentWallet?: string agentPrivateKey?: string } export interface PairOptions { maxPositionSize?: number stopLossPercent?: number takeProfitPercent?: number enableTrailingStop?: boolean } export interface EnhancedHedgingConfig { executionIntervalMs: number enableBasisManagement: boolean enableConvergenceManagement: boolean enableStopLossManagement: boolean enablePerformanceTracking: boolean maxDailyTrades: number maxConcurrentPairs: number emergencyStopThreshold: number riskLimits: { maxPositionSize: number maxTotalExposure: number maxDailyLoss: number maxBasisDeviation: number } notifications: { enableSlack: boolean enableEmail: boolean enableWebhook: boolean } basisManager?: BasisManager convergenceManager?: PriceConvergenceManager stopLossManager?: StopLossManager convergenceAlgorithm?: ConvergenceAlgorithm } export interface PerformanceMetrics { totalTrades: number successfulTrades: number failedTrades: number totalPnl: number totalFees: number averageConvergenceTime: number basisRiskEvents: number stopLossTriggered: number takeProfitTriggered: number maxDrawdown: number sharpeRatio: number startTime: number } export interface HedgingExecutorStatus { isActive: boolean activePairs: number activeStopOrders: number performanceMetrics: PerformanceMetrics config: EnhancedHedgingConfig } export const enhancedHedgingExecutor = new EnhancedHedgingExecutor()