import { EventEmitter } from 'events' import { logger } from '../../utils/logger.js' import { PacificaProxyClient } from '../../exchanges/pacifica/PacificaProxyClient.js' import { SamePlatformHedgingManager } from './SamePlatformHedgingManager.js' /** * 优化的统一对冲系统 * 整合基差管理、价格收敛、止盈止损等功能到现有架构中 */ export class OptimizedHedgingSystem extends EventEmitter { private platformManager: SamePlatformHedgingManager private dataCache: Map = new Map() private config: OptimizedConfig private mainInterval?: NodeJS.Timeout private cycleCount: number = 0 // 整合的状态管理 private accountStates: Map = new Map() private basisData: Map = new Map() private performanceMetrics: PerformanceMetrics constructor(config?: Partial) { super() this.config = { mainCycleIntervalMs: config?.mainCycleIntervalMs || 5000, // 统一5秒间隔 cacheTimeoutMs: config?.cacheTimeoutMs || 2000, // 2秒缓存 basisConfig: { maxDeviation: config?.basisConfig?.maxDeviation || 200, alertThreshold: config?.basisConfig?.alertThreshold || 100, historyRetentionHours: config?.basisConfig?.historyRetentionHours || 24, }, convergenceConfig: { maxPriceDeviation: config?.convergenceConfig?.maxPriceDeviation || 0.005, convergenceStepPercent: config?.convergenceConfig?.convergenceStepPercent || 0.001, maxDailyAdjustments: config?.convergenceConfig?.maxDailyAdjustments || 20, }, stopLossConfig: { defaultStopLoss: config?.stopLossConfig?.defaultStopLoss || 0.015, defaultTakeProfit: config?.stopLossConfig?.defaultTakeProfit || 0.025, enableTrailing: config?.stopLossConfig?.enableTrailing || true, }, riskLimits: { maxPositionSize: config?.riskLimits?.maxPositionSize || 0.01, maxTotalExposure: config?.riskLimits?.maxTotalExposure || 0.05, emergencyStopThreshold: config?.riskLimits?.emergencyStopThreshold || -0.03, ...config?.riskLimits, }, } this.platformManager = new SamePlatformHedgingManager('pacifica', this.config.riskLimits) this.performanceMetrics = this.initializeMetrics() } /** * 启动优化的对冲系统 */ async start(): Promise { logger.info('启动优化对冲系统') // 单一主循环定时器 this.mainInterval = setInterval(() => { this.executeMainCycle() }, this.config.mainCycleIntervalMs) logger.info('优化对冲系统已启动', { interval: this.config.mainCycleIntervalMs, cacheTimeout: this.config.cacheTimeoutMs, }) this.emit('systemStarted') } /** * 停止系统 */ async stop(): Promise { if (this.mainInterval) { clearInterval(this.mainInterval) this.mainInterval = undefined } logger.info('优化对冲系统已停止') this.emit('systemStopped') } /** * 添加交易对 */ async addTradingPair( pairId: string, account1Config: AccountConfig, account2Config: AccountConfig, symbol: string, ): Promise { // 使用现有平台管理器添加账户 this.platformManager.addAccount(account1Config.account, account1Config) this.platformManager.addAccount(account2Config.account, account2Config) this.platformManager.createHedgePair(pairId, account1Config.account, account2Config.account, symbol) // 初始化增强状态跟踪 this.accountStates.set(account1Config.account, this.createEnhancedAccountState(account1Config.account)) this.accountStates.set(account2Config.account, this.createEnhancedAccountState(account2Config.account)) logger.info('添加交易对到优化系统', { pairId, symbol }) } /** * 主执行循环 - 替代多个定时器 */ private async executeMainCycle(): Promise { try { this.cycleCount++ // 每个周期都执行的核心功能 await this.updateCachedData() await this.performIntegratedRiskCheck() await this.checkAndExecuteStrategies() // 基于周期数执行不同频率的任务 if (this.cycleCount % 3 === 0) { // 每15秒:生成交易信号和检查收敛 await this.executeConvergenceCheck() } if (this.cycleCount % 6 === 0) { // 每30秒:基差分析和预警 await this.executeBasisAnalysis() } if (this.cycleCount % 12 === 0) { // 每60秒:系统健康检查和性能统计 await this.performSystemHealthCheck() await this.updatePerformanceMetrics() } // 每100个周期重置计数器 if (this.cycleCount >= 100) { this.cycleCount = 0 } } catch (error: any) { logger.error('主循环执行失败', { error: error.message, cycle: this.cycleCount }) this.emit('systemError', { error, cycle: this.cycleCount }) } } /** * 智能缓存数据更新 */ private async updateCachedData(): Promise { const tasks = [] // 并行更新所有账户数据 for (const accountId of this.accountStates.keys()) { tasks.push(this.updateAccountData(accountId)) } // 更新市场数据 tasks.push(this.updateMarketData()) await Promise.allSettled(tasks) } /** * 更新单个账户数据(带缓存) */ private async updateAccountData(accountId: string): Promise { const cacheKey = `account_${accountId}` const cached = this.getFromCache(cacheKey) if (cached) { return cached } try { // 使用平台管理器更新仓位 await this.platformManager.updateAccountPositions(accountId) // 更新增强状态 const accountState = this.accountStates.get(accountId) if (accountState) { // 获取最新仓位和余额信息 accountState.lastUpdate = Date.now() // 这里可以添加更多状态更新逻辑 } this.setCache(cacheKey, true) } catch (error: any) { logger.warn('更新账户数据失败', { accountId, error: error.message }) } } /** * 更新市场数据(带缓存) */ private async updateMarketData(): Promise { const cacheKey = 'market_data' const cached = this.getFromCache(cacheKey) if (cached) { return cached } try { // 获取市场价格数据 // 这里可以调用现有的价格获取逻辑 this.setCache(cacheKey, true) } catch (error: any) { logger.warn('更新市场数据失败', { error: error.message }) } } /** * 集成的风险检查 */ private async performIntegratedRiskCheck(): Promise { try { // 1. 基础风险检查(复用现有逻辑) const hedgeStatuses = this.platformManager.getHedgePairStatuses() // 2. 基差风险检查 const basisRisk = await this.checkBasisRisk() // 3. 收敛风险检查 const convergenceRisk = await this.checkConvergenceRisk() // 4. 综合风险评估 const overallRisk = this.calculateOverallRisk(hedgeStatuses, basisRisk, convergenceRisk) if (overallRisk.level === 'HIGH' || overallRisk.level === 'CRITICAL') { this.emit('riskAlert', overallRisk) if (overallRisk.level === 'CRITICAL') { await this.triggerEmergencyStop() } } } catch (error: any) { logger.error('集成风险检查失败', { error: error.message }) } } /** * 基差风险检查 */ private async checkBasisRisk(): Promise { const symbols = this.getMonitoredSymbols() const riskEvents: BasisRiskEvent[] = [] for (const symbol of symbols) { const basisHistory = this.basisData.get(symbol) || [] if (basisHistory.length > 0) { const latest = basisHistory[basisHistory.length - 1] const basis = Math.abs(latest.basis) if (basis > this.config.basisConfig.maxDeviation) { riskEvents.push({ symbol, basis: latest.basis, threshold: this.config.basisConfig.maxDeviation, severity: 'HIGH', timestamp: Date.now(), }) } else if (basis > this.config.basisConfig.alertThreshold) { riskEvents.push({ symbol, basis: latest.basis, threshold: this.config.basisConfig.alertThreshold, severity: 'MEDIUM', timestamp: Date.now(), }) } } } return { overallRisk: riskEvents.length > 0 ? 'HIGH' : 'LOW', events: riskEvents, timestamp: Date.now(), } } /** * 收敛风险检查 */ private async checkConvergenceRisk(): Promise { const hedgeStatuses = this.platformManager.getHedgePairStatuses() const riskEvents: ConvergenceRiskEvent[] = [] for (const status of hedgeStatuses) { const priceDeviation = Math.abs(status.netExposure) // 简化的偏差计算 if (priceDeviation > this.config.convergenceConfig.maxPriceDeviation) { riskEvents.push({ pairId: status.pairId, deviation: priceDeviation, threshold: this.config.convergenceConfig.maxPriceDeviation, severity: 'HIGH', timestamp: Date.now(), }) } } return { overallRisk: riskEvents.length > 0 ? 'HIGH' : 'LOW', events: riskEvents, timestamp: Date.now(), } } /** * 综合风险评估 */ private calculateOverallRisk( hedgeStatuses: any[], basisRisk: BasisRiskAssessment, convergenceRisk: ConvergenceRiskAssessment, ): OverallRiskAssessment { let riskScore = 0 const factors: string[] = [] // 基差风险权重 40% if (basisRisk.overallRisk === 'HIGH') { riskScore += 40 factors.push(`基差风险高:${basisRisk.events.length}个事件`) } // 收敛风险权重 35% if (convergenceRisk.overallRisk === 'HIGH') { riskScore += 35 factors.push(`收敛风险高:${convergenceRisk.events.length}个事件`) } // 对冲状态风险权重 25% const inactiveHedges = hedgeStatuses.filter(h => !h.isActive).length if (inactiveHedges > 0) { riskScore += 25 factors.push(`${inactiveHedges}个对冲对未激活`) } // 确定风险等级 let level: 'LOW' | 'MEDIUM' | 'HIGH' | 'CRITICAL' if (riskScore >= 80) { level = 'CRITICAL' } else if (riskScore >= 60) { level = 'HIGH' } else if (riskScore >= 30) { level = 'MEDIUM' } else { level = 'LOW' } return { level, score: riskScore, factors, basisRisk, convergenceRisk, timestamp: Date.now(), } } /** * 执行收敛检查 */ private async executeConvergenceCheck(): Promise { try { const hedgeStatuses = this.platformManager.getHedgePairStatuses() for (const status of hedgeStatuses) { const needsAdjustment = Math.abs(status.netExposure) > this.config.convergenceConfig.maxPriceDeviation if (needsAdjustment) { await this.platformManager.rebalanceHedgePair(status.pairId, 0.01) this.emit('convergenceAdjustment', { pairId: status.pairId, netExposure: status.netExposure, timestamp: Date.now(), }) } } } catch (error: any) { logger.error('收敛检查失败', { error: error.message }) } } /** * 执行基差分析 */ private async executeBasisAnalysis(): Promise { try { const symbols = this.getMonitoredSymbols() for (const symbol of symbols) { // 模拟基差数据更新 const spotPrice = await this.getSpotPrice(symbol) const futuresPrice = await this.getFuturesPrice(symbol) if (spotPrice && futuresPrice) { this.updateBasisData(symbol, spotPrice, futuresPrice) } } } catch (error: any) { logger.error('基差分析失败', { error: error.message }) } } /** * 更新基差数据 */ private updateBasisData(symbol: string, spotPrice: number, futuresPrice: number): void { if (!this.basisData.has(symbol)) { this.basisData.set(symbol, []) } const history = this.basisData.get(symbol)! const basis = futuresPrice - spotPrice const basisPercent = (basis / spotPrice) * 100 const dataPoint: BasisDataPoint = { spotPrice, futuresPrice, basis, basisPercent, timestamp: Date.now(), } history.push(dataPoint) // 限制历史数据长度 const maxPoints = (this.config.basisConfig.historyRetentionHours * 3600 * 1000) / this.config.mainCycleIntervalMs if (history.length > maxPoints) { history.splice(0, history.length - maxPoints) } // 检查是否需要发出基差预警 if (Math.abs(basis) > this.config.basisConfig.alertThreshold) { this.emit('basisAlert', { symbol, basis, basisPercent, threshold: this.config.basisConfig.alertThreshold, timestamp: Date.now(), }) } } /** * 缓存管理 */ private getFromCache(key: string): any { const entry = this.dataCache.get(key) if (entry && Date.now() - entry.timestamp < this.config.cacheTimeoutMs) { return entry.data } return null } private setCache(key: string, data: any): void { this.dataCache.set(key, { data, timestamp: Date.now(), }) } /** * 辅助方法 */ private createEnhancedAccountState(accountId: string): EnhancedAccountState { return { accountId, lastUpdate: Date.now(), convergenceHistory: [], stopLossOrders: [], riskScore: 0, } } private initializeMetrics(): PerformanceMetrics { return { totalAdjustments: 0, successfulAdjustments: 0, basisAlerts: 0, convergenceEvents: 0, systemUptime: Date.now(), } } private getMonitoredSymbols(): string[] { const hedgeStatuses = this.platformManager.getHedgePairStatuses() return hedgeStatuses.map(status => status.symbol) } private async getSpotPrice(symbol: string): Promise { // 实现现货价格获取逻辑 return 50000 // 模拟价格 } private async getFuturesPrice(symbol: string): Promise { // 实现期货价格获取逻辑 return 50100 // 模拟价格 } private async performSystemHealthCheck(): Promise { // 实现系统健康检查 logger.debug('系统健康检查完成') } private async updatePerformanceMetrics(): Promise { // 更新性能指标 this.performanceMetrics.systemUptime = Date.now() } private async checkAndExecuteStrategies(): Promise { // 实现策略检查和执行 } private async triggerEmergencyStop(): Promise { logger.warn('触发紧急停止') await this.stop() this.emit('emergencyStop', { timestamp: Date.now() }) } /** * 获取系统状态 */ getSystemStatus(): OptimizedSystemStatus { return { isRunning: !!this.mainInterval, activePairs: this.platformManager.getHedgePairStatuses().length, accountCount: this.accountStates.size, performanceMetrics: this.performanceMetrics, cacheEntries: this.dataCache.size, cycleCount: this.cycleCount, } } } // 类型定义 export interface AccountConfig { account: string privateKey: string agentWallet?: string agentPrivateKey?: string } export interface OptimizedConfig { mainCycleIntervalMs: number cacheTimeoutMs: number basisConfig: { maxDeviation: number alertThreshold: number historyRetentionHours: number } convergenceConfig: { maxPriceDeviation: number convergenceStepPercent: number maxDailyAdjustments: number } stopLossConfig: { defaultStopLoss: number defaultTakeProfit: number enableTrailing: boolean } riskLimits: { maxPositionSize: number maxTotalExposure: number emergencyStopThreshold: number } } export interface CacheEntry { data: any timestamp: number } export interface EnhancedAccountState { accountId: string lastUpdate: number convergenceHistory: any[] stopLossOrders: any[] riskScore: number } export interface BasisDataPoint { spotPrice: number futuresPrice: number basis: number basisPercent: number timestamp: number } export interface BasisRiskEvent { symbol: string basis: number threshold: number severity: 'LOW' | 'MEDIUM' | 'HIGH' timestamp: number } export interface ConvergenceRiskEvent { pairId: string deviation: number threshold: number severity: 'LOW' | 'MEDIUM' | 'HIGH' timestamp: number } export interface BasisRiskAssessment { overallRisk: 'LOW' | 'HIGH' events: BasisRiskEvent[] timestamp: number } export interface ConvergenceRiskAssessment { overallRisk: 'LOW' | 'HIGH' events: ConvergenceRiskEvent[] timestamp: number } export interface OverallRiskAssessment { level: 'LOW' | 'MEDIUM' | 'HIGH' | 'CRITICAL' score: number factors: string[] basisRisk: BasisRiskAssessment convergenceRisk: ConvergenceRiskAssessment timestamp: number } export interface PerformanceMetrics { totalAdjustments: number successfulAdjustments: number basisAlerts: number convergenceEvents: number systemUptime: number } export interface OptimizedSystemStatus { isRunning: boolean activePairs: number accountCount: number performanceMetrics: PerformanceMetrics cacheEntries: number cycleCount: number }