| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646 |
- import { EventEmitter } from 'events'
- import { logger } from '../../utils/logger.js'
- import { PacificaProxyClient } from '../../exchanges/pacifica/PacificaProxyClient.js'
- import { SamePlatformHedgingManager } from './SamePlatformHedgingManager.js'
- /**
- * 优化的统一对冲系统
- * 整合基差管理、价格收敛、止盈止损等功能到现有架构中
- */
- export class OptimizedHedgingSystem extends EventEmitter {
- private platformManager: SamePlatformHedgingManager
- private dataCache: Map<string, CacheEntry> = new Map()
- private config: OptimizedConfig
- private mainInterval?: NodeJS.Timeout
- private cycleCount: number = 0
- // 整合的状态管理
- private accountStates: Map<string, EnhancedAccountState> = new Map()
- private basisData: Map<string, BasisDataPoint[]> = new Map()
- private performanceMetrics: PerformanceMetrics
- constructor(config?: Partial<OptimizedConfig>) {
- super()
- this.config = {
- mainCycleIntervalMs: config?.mainCycleIntervalMs || 5000, // 统一5秒间隔
- cacheTimeoutMs: config?.cacheTimeoutMs || 2000, // 2秒缓存
- basisConfig: {
- maxDeviation: config?.basisConfig?.maxDeviation || 200,
- alertThreshold: config?.basisConfig?.alertThreshold || 100,
- historyRetentionHours: config?.basisConfig?.historyRetentionHours || 24,
- },
- convergenceConfig: {
- maxPriceDeviation: config?.convergenceConfig?.maxPriceDeviation || 0.005,
- convergenceStepPercent: config?.convergenceConfig?.convergenceStepPercent || 0.001,
- maxDailyAdjustments: config?.convergenceConfig?.maxDailyAdjustments || 20,
- },
- stopLossConfig: {
- defaultStopLoss: config?.stopLossConfig?.defaultStopLoss || 0.015,
- defaultTakeProfit: config?.stopLossConfig?.defaultTakeProfit || 0.025,
- enableTrailing: config?.stopLossConfig?.enableTrailing || true,
- },
- riskLimits: {
- maxPositionSize: config?.riskLimits?.maxPositionSize || 0.01,
- maxTotalExposure: config?.riskLimits?.maxTotalExposure || 0.05,
- emergencyStopThreshold: config?.riskLimits?.emergencyStopThreshold || -0.03,
- ...config?.riskLimits,
- },
- }
- this.platformManager = new SamePlatformHedgingManager('pacifica', this.config.riskLimits)
- this.performanceMetrics = this.initializeMetrics()
- }
- /**
- * 启动优化的对冲系统
- */
- async start(): Promise<void> {
- logger.info('启动优化对冲系统')
- // 单一主循环定时器
- this.mainInterval = setInterval(() => {
- this.executeMainCycle()
- }, this.config.mainCycleIntervalMs)
- logger.info('优化对冲系统已启动', {
- interval: this.config.mainCycleIntervalMs,
- cacheTimeout: this.config.cacheTimeoutMs,
- })
- this.emit('systemStarted')
- }
- /**
- * 停止系统
- */
- async stop(): Promise<void> {
- if (this.mainInterval) {
- clearInterval(this.mainInterval)
- this.mainInterval = undefined
- }
- logger.info('优化对冲系统已停止')
- this.emit('systemStopped')
- }
- /**
- * 添加交易对
- */
- async addTradingPair(
- pairId: string,
- account1Config: AccountConfig,
- account2Config: AccountConfig,
- symbol: string,
- ): Promise<void> {
- // 使用现有平台管理器添加账户
- this.platformManager.addAccount(account1Config.account, account1Config)
- this.platformManager.addAccount(account2Config.account, account2Config)
- this.platformManager.createHedgePair(pairId, account1Config.account, account2Config.account, symbol)
- // 初始化增强状态跟踪
- this.accountStates.set(account1Config.account, this.createEnhancedAccountState(account1Config.account))
- this.accountStates.set(account2Config.account, this.createEnhancedAccountState(account2Config.account))
- logger.info('添加交易对到优化系统', { pairId, symbol })
- }
- /**
- * 主执行循环 - 替代多个定时器
- */
- private async executeMainCycle(): Promise<void> {
- try {
- this.cycleCount++
- // 每个周期都执行的核心功能
- await this.updateCachedData()
- await this.performIntegratedRiskCheck()
- await this.checkAndExecuteStrategies()
- // 基于周期数执行不同频率的任务
- if (this.cycleCount % 3 === 0) {
- // 每15秒:生成交易信号和检查收敛
- await this.executeConvergenceCheck()
- }
- if (this.cycleCount % 6 === 0) {
- // 每30秒:基差分析和预警
- await this.executeBasisAnalysis()
- }
- if (this.cycleCount % 12 === 0) {
- // 每60秒:系统健康检查和性能统计
- await this.performSystemHealthCheck()
- await this.updatePerformanceMetrics()
- }
- // 每100个周期重置计数器
- if (this.cycleCount >= 100) {
- this.cycleCount = 0
- }
- } catch (error: any) {
- logger.error('主循环执行失败', { error: error.message, cycle: this.cycleCount })
- this.emit('systemError', { error, cycle: this.cycleCount })
- }
- }
- /**
- * 智能缓存数据更新
- */
- private async updateCachedData(): Promise<void> {
- const tasks = []
- // 并行更新所有账户数据
- for (const accountId of this.accountStates.keys()) {
- tasks.push(this.updateAccountData(accountId))
- }
- // 更新市场数据
- tasks.push(this.updateMarketData())
- await Promise.allSettled(tasks)
- }
- /**
- * 更新单个账户数据(带缓存)
- */
- private async updateAccountData(accountId: string): Promise<void> {
- const cacheKey = `account_${accountId}`
- const cached = this.getFromCache(cacheKey)
- if (cached) {
- return cached
- }
- try {
- // 使用平台管理器更新仓位
- await this.platformManager.updateAccountPositions(accountId)
- // 更新增强状态
- const accountState = this.accountStates.get(accountId)
- if (accountState) {
- // 获取最新仓位和余额信息
- accountState.lastUpdate = Date.now()
- // 这里可以添加更多状态更新逻辑
- }
- this.setCache(cacheKey, true)
- } catch (error: any) {
- logger.warn('更新账户数据失败', { accountId, error: error.message })
- }
- }
- /**
- * 更新市场数据(带缓存)
- */
- private async updateMarketData(): Promise<void> {
- const cacheKey = 'market_data'
- const cached = this.getFromCache(cacheKey)
- if (cached) {
- return cached
- }
- try {
- // 获取市场价格数据
- // 这里可以调用现有的价格获取逻辑
- this.setCache(cacheKey, true)
- } catch (error: any) {
- logger.warn('更新市场数据失败', { error: error.message })
- }
- }
- /**
- * 集成的风险检查
- */
- private async performIntegratedRiskCheck(): Promise<void> {
- try {
- // 1. 基础风险检查(复用现有逻辑)
- const hedgeStatuses = this.platformManager.getHedgePairStatuses()
- // 2. 基差风险检查
- const basisRisk = await this.checkBasisRisk()
- // 3. 收敛风险检查
- const convergenceRisk = await this.checkConvergenceRisk()
- // 4. 综合风险评估
- const overallRisk = this.calculateOverallRisk(hedgeStatuses, basisRisk, convergenceRisk)
- if (overallRisk.level === 'HIGH' || overallRisk.level === 'CRITICAL') {
- this.emit('riskAlert', overallRisk)
- if (overallRisk.level === 'CRITICAL') {
- await this.triggerEmergencyStop()
- }
- }
- } catch (error: any) {
- logger.error('集成风险检查失败', { error: error.message })
- }
- }
- /**
- * 基差风险检查
- */
- private async checkBasisRisk(): Promise<BasisRiskAssessment> {
- const symbols = this.getMonitoredSymbols()
- const riskEvents: BasisRiskEvent[] = []
- for (const symbol of symbols) {
- const basisHistory = this.basisData.get(symbol) || []
- if (basisHistory.length > 0) {
- const latest = basisHistory[basisHistory.length - 1]
- const basis = Math.abs(latest.basis)
- if (basis > this.config.basisConfig.maxDeviation) {
- riskEvents.push({
- symbol,
- basis: latest.basis,
- threshold: this.config.basisConfig.maxDeviation,
- severity: 'HIGH',
- timestamp: Date.now(),
- })
- } else if (basis > this.config.basisConfig.alertThreshold) {
- riskEvents.push({
- symbol,
- basis: latest.basis,
- threshold: this.config.basisConfig.alertThreshold,
- severity: 'MEDIUM',
- timestamp: Date.now(),
- })
- }
- }
- }
- return {
- overallRisk: riskEvents.length > 0 ? 'HIGH' : 'LOW',
- events: riskEvents,
- timestamp: Date.now(),
- }
- }
- /**
- * 收敛风险检查
- */
- private async checkConvergenceRisk(): Promise<ConvergenceRiskAssessment> {
- const hedgeStatuses = this.platformManager.getHedgePairStatuses()
- const riskEvents: ConvergenceRiskEvent[] = []
- for (const status of hedgeStatuses) {
- const priceDeviation = Math.abs(status.netExposure) // 简化的偏差计算
- if (priceDeviation > this.config.convergenceConfig.maxPriceDeviation) {
- riskEvents.push({
- pairId: status.pairId,
- deviation: priceDeviation,
- threshold: this.config.convergenceConfig.maxPriceDeviation,
- severity: 'HIGH',
- timestamp: Date.now(),
- })
- }
- }
- return {
- overallRisk: riskEvents.length > 0 ? 'HIGH' : 'LOW',
- events: riskEvents,
- timestamp: Date.now(),
- }
- }
- /**
- * 综合风险评估
- */
- private calculateOverallRisk(
- hedgeStatuses: any[],
- basisRisk: BasisRiskAssessment,
- convergenceRisk: ConvergenceRiskAssessment,
- ): OverallRiskAssessment {
- let riskScore = 0
- const factors: string[] = []
- // 基差风险权重 40%
- if (basisRisk.overallRisk === 'HIGH') {
- riskScore += 40
- factors.push(`基差风险高:${basisRisk.events.length}个事件`)
- }
- // 收敛风险权重 35%
- if (convergenceRisk.overallRisk === 'HIGH') {
- riskScore += 35
- factors.push(`收敛风险高:${convergenceRisk.events.length}个事件`)
- }
- // 对冲状态风险权重 25%
- const inactiveHedges = hedgeStatuses.filter(h => !h.isActive).length
- if (inactiveHedges > 0) {
- riskScore += 25
- factors.push(`${inactiveHedges}个对冲对未激活`)
- }
- // 确定风险等级
- let level: 'LOW' | 'MEDIUM' | 'HIGH' | 'CRITICAL'
- if (riskScore >= 80) {
- level = 'CRITICAL'
- } else if (riskScore >= 60) {
- level = 'HIGH'
- } else if (riskScore >= 30) {
- level = 'MEDIUM'
- } else {
- level = 'LOW'
- }
- return {
- level,
- score: riskScore,
- factors,
- basisRisk,
- convergenceRisk,
- timestamp: Date.now(),
- }
- }
- /**
- * 执行收敛检查
- */
- private async executeConvergenceCheck(): Promise<void> {
- try {
- const hedgeStatuses = this.platformManager.getHedgePairStatuses()
- for (const status of hedgeStatuses) {
- const needsAdjustment = Math.abs(status.netExposure) > this.config.convergenceConfig.maxPriceDeviation
- if (needsAdjustment) {
- await this.platformManager.rebalanceHedgePair(status.pairId, 0.01)
- this.emit('convergenceAdjustment', {
- pairId: status.pairId,
- netExposure: status.netExposure,
- timestamp: Date.now(),
- })
- }
- }
- } catch (error: any) {
- logger.error('收敛检查失败', { error: error.message })
- }
- }
- /**
- * 执行基差分析
- */
- private async executeBasisAnalysis(): Promise<void> {
- try {
- const symbols = this.getMonitoredSymbols()
- for (const symbol of symbols) {
- // 模拟基差数据更新
- const spotPrice = await this.getSpotPrice(symbol)
- const futuresPrice = await this.getFuturesPrice(symbol)
- if (spotPrice && futuresPrice) {
- this.updateBasisData(symbol, spotPrice, futuresPrice)
- }
- }
- } catch (error: any) {
- logger.error('基差分析失败', { error: error.message })
- }
- }
- /**
- * 更新基差数据
- */
- private updateBasisData(symbol: string, spotPrice: number, futuresPrice: number): void {
- if (!this.basisData.has(symbol)) {
- this.basisData.set(symbol, [])
- }
- const history = this.basisData.get(symbol)!
- const basis = futuresPrice - spotPrice
- const basisPercent = (basis / spotPrice) * 100
- const dataPoint: BasisDataPoint = {
- spotPrice,
- futuresPrice,
- basis,
- basisPercent,
- timestamp: Date.now(),
- }
- history.push(dataPoint)
- // 限制历史数据长度
- const maxPoints = (this.config.basisConfig.historyRetentionHours * 3600 * 1000) / this.config.mainCycleIntervalMs
- if (history.length > maxPoints) {
- history.splice(0, history.length - maxPoints)
- }
- // 检查是否需要发出基差预警
- if (Math.abs(basis) > this.config.basisConfig.alertThreshold) {
- this.emit('basisAlert', {
- symbol,
- basis,
- basisPercent,
- threshold: this.config.basisConfig.alertThreshold,
- timestamp: Date.now(),
- })
- }
- }
- /**
- * 缓存管理
- */
- private getFromCache(key: string): any {
- const entry = this.dataCache.get(key)
- if (entry && Date.now() - entry.timestamp < this.config.cacheTimeoutMs) {
- return entry.data
- }
- return null
- }
- private setCache(key: string, data: any): void {
- this.dataCache.set(key, {
- data,
- timestamp: Date.now(),
- })
- }
- /**
- * 辅助方法
- */
- private createEnhancedAccountState(accountId: string): EnhancedAccountState {
- return {
- accountId,
- lastUpdate: Date.now(),
- convergenceHistory: [],
- stopLossOrders: [],
- riskScore: 0,
- }
- }
- private initializeMetrics(): PerformanceMetrics {
- return {
- totalAdjustments: 0,
- successfulAdjustments: 0,
- basisAlerts: 0,
- convergenceEvents: 0,
- systemUptime: Date.now(),
- }
- }
- private getMonitoredSymbols(): string[] {
- const hedgeStatuses = this.platformManager.getHedgePairStatuses()
- return hedgeStatuses.map(status => status.symbol)
- }
- private async getSpotPrice(symbol: string): Promise<number | null> {
- // 实现现货价格获取逻辑
- return 50000 // 模拟价格
- }
- private async getFuturesPrice(symbol: string): Promise<number | null> {
- // 实现期货价格获取逻辑
- return 50100 // 模拟价格
- }
- private async performSystemHealthCheck(): Promise<void> {
- // 实现系统健康检查
- logger.debug('系统健康检查完成')
- }
- private async updatePerformanceMetrics(): Promise<void> {
- // 更新性能指标
- this.performanceMetrics.systemUptime = Date.now()
- }
- private async checkAndExecuteStrategies(): Promise<void> {
- // 实现策略检查和执行
- }
- private async triggerEmergencyStop(): Promise<void> {
- logger.warn('触发紧急停止')
- await this.stop()
- this.emit('emergencyStop', { timestamp: Date.now() })
- }
- /**
- * 获取系统状态
- */
- getSystemStatus(): OptimizedSystemStatus {
- return {
- isRunning: !!this.mainInterval,
- activePairs: this.platformManager.getHedgePairStatuses().length,
- accountCount: this.accountStates.size,
- performanceMetrics: this.performanceMetrics,
- cacheEntries: this.dataCache.size,
- cycleCount: this.cycleCount,
- }
- }
- }
- // 类型定义
- export interface AccountConfig {
- account: string
- privateKey: string
- agentWallet?: string
- agentPrivateKey?: string
- }
- export interface OptimizedConfig {
- mainCycleIntervalMs: number
- cacheTimeoutMs: number
- basisConfig: {
- maxDeviation: number
- alertThreshold: number
- historyRetentionHours: number
- }
- convergenceConfig: {
- maxPriceDeviation: number
- convergenceStepPercent: number
- maxDailyAdjustments: number
- }
- stopLossConfig: {
- defaultStopLoss: number
- defaultTakeProfit: number
- enableTrailing: boolean
- }
- riskLimits: {
- maxPositionSize: number
- maxTotalExposure: number
- emergencyStopThreshold: number
- }
- }
- export interface CacheEntry {
- data: any
- timestamp: number
- }
- export interface EnhancedAccountState {
- accountId: string
- lastUpdate: number
- convergenceHistory: any[]
- stopLossOrders: any[]
- riskScore: number
- }
- export interface BasisDataPoint {
- spotPrice: number
- futuresPrice: number
- basis: number
- basisPercent: number
- timestamp: number
- }
- export interface BasisRiskEvent {
- symbol: string
- basis: number
- threshold: number
- severity: 'LOW' | 'MEDIUM' | 'HIGH'
- timestamp: number
- }
- export interface ConvergenceRiskEvent {
- pairId: string
- deviation: number
- threshold: number
- severity: 'LOW' | 'MEDIUM' | 'HIGH'
- timestamp: number
- }
- export interface BasisRiskAssessment {
- overallRisk: 'LOW' | 'HIGH'
- events: BasisRiskEvent[]
- timestamp: number
- }
- export interface ConvergenceRiskAssessment {
- overallRisk: 'LOW' | 'HIGH'
- events: ConvergenceRiskEvent[]
- timestamp: number
- }
- export interface OverallRiskAssessment {
- level: 'LOW' | 'MEDIUM' | 'HIGH' | 'CRITICAL'
- score: number
- factors: string[]
- basisRisk: BasisRiskAssessment
- convergenceRisk: ConvergenceRiskAssessment
- timestamp: number
- }
- export interface PerformanceMetrics {
- totalAdjustments: number
- successfulAdjustments: number
- basisAlerts: number
- convergenceEvents: number
- systemUptime: number
- }
- export interface OptimizedSystemStatus {
- isRunning: boolean
- activePairs: number
- accountCount: number
- performanceMetrics: PerformanceMetrics
- cacheEntries: number
- cycleCount: number
- }
|