import { EventEmitter } from 'events' import { logger } from '../../utils/logger.js' /** * 基差管理器 - 监控和管理现货与期货之间的基差风险 */ export class BasisManager extends EventEmitter { private basisHistory: Map = new Map() private alertConfig: BasisAlertConfig private isMonitoring: boolean = false private monitoringInterval?: NodeJS.Timeout constructor(alertConfig?: Partial) { super() this.alertConfig = { maxBasisDeviation: alertConfig?.maxBasisDeviation || 200, // $200 基差预警阈值 basisVolatilityThreshold: alertConfig?.basisVolatilityThreshold || 0.05, // 5% 波动阈值 historyRetentionHours: alertConfig?.historyRetentionHours || 24, // 保留24小时历史 monitoringIntervalMs: alertConfig?.monitoringIntervalMs || 5000, // 5秒监控间隔 enabledSymbols: alertConfig?.enabledSymbols || ['BTC-USD', 'ETH-USD'], } } /** * 更新基差数据 */ updateBasis(symbol: string, spotPrice: number, futuresPrice: number, timestamp?: number): BasisSnapshot { const now = timestamp || Date.now() const basis = futuresPrice - spotPrice const basisPercent = (basis / spotPrice) * 100 const snapshot: BasisSnapshot = { symbol, spotPrice, futuresPrice, basis, basisPercent, timestamp: now, } // 更新历史数据 this.updateBasisHistory(symbol, snapshot) // 计算基差统计 const stats = this.calculateBasisStats(symbol) // 检查风险预警 const riskLevel = this.assessBasisRisk(symbol, snapshot, stats) // 发出事件 this.emit('basisUpdate', { snapshot, stats, riskLevel }) if (riskLevel !== 'normal') { this.emit('basisAlert', { symbol, riskLevel, snapshot, stats, message: this.generateAlertMessage(symbol, riskLevel, snapshot, stats), }) logger.warn('基差风险预警', { symbol, basis, basisPercent: basisPercent.toFixed(4), riskLevel, spotPrice, futuresPrice, }) } return snapshot } /** * 批量更新多个交易对的基差 */ updateMultipleBasis(updates: Array<{ symbol: string; spotPrice: number; futuresPrice: number }>): BasisSnapshot[] { return updates.map(update => this.updateBasis(update.symbol, update.spotPrice, update.futuresPrice)) } /** * 获取基差快照 */ getBasisSnapshot(symbol: string): BasisSnapshot | null { const history = this.basisHistory.get(symbol) if (!history || history.length === 0) return null const latest = history[history.length - 1] return { symbol, spotPrice: latest.spotPrice, futuresPrice: latest.futuresPrice, basis: latest.basis, basisPercent: latest.basisPercent, timestamp: latest.timestamp, } } /** * 获取基差统计信息 */ getBasisStats(symbol: string): BasisStats | null { return this.calculateBasisStats(symbol) } /** * 评估基差风险等级 */ assessBasisRisk(symbol: string, snapshot: BasisSnapshot, stats?: BasisStats): BasisRiskLevel { const currentStats = stats || this.calculateBasisStats(symbol) if (!currentStats) return 'normal' const { basis, basisPercent } = snapshot const { mean, standardDeviation, volatility } = currentStats // 检查绝对基差是否超过阈值 if (Math.abs(basis) > this.alertConfig.maxBasisDeviation) { return 'high' } // 检查基差是否偏离历史均值过多(2个标准差) const deviationFromMean = Math.abs(basis - mean) if (deviationFromMean > 2 * standardDeviation) { return 'medium' } // 检查基差波动率是否过高 if (volatility > this.alertConfig.basisVolatilityThreshold) { return 'medium' } return 'normal' } /** * 计算基差调整建议 */ calculateAdjustmentSuggestion(symbol: string, targetNetExposure: number = 0): BasisAdjustmentSuggestion | null { const snapshot = this.getBasisSnapshot(symbol) const stats = this.getBasisStats(symbol) if (!snapshot || !stats) return null // 根据基差趋势和风险等级计算调整建议 const riskLevel = this.assessBasisRisk(symbol, snapshot, stats) const { basis, basisPercent } = snapshot const { trend, volatility } = stats let suggestion: BasisAdjustmentSuggestion = { symbol, adjustmentType: 'none', suggestedAction: 'hold', confidence: 0.5, reasoning: '基差风险正常', urgency: 'low', } if (riskLevel === 'high') { if (basis > 0 && trend === 'widening') { // 正基差扩大,建议减少期货多头或增加现货多头 suggestion = { symbol, adjustmentType: 'reduce_futures_long', suggestedAction: 'reduce_position', confidence: 0.8, reasoning: `正基差过大(${basis.toFixed(2)})且趋于扩大,建议减少期货多头敞口`, urgency: 'high', targetAdjustmentPercent: Math.min(0.3, Math.abs(basisPercent) / 100), // 最多调整30% } } else if (basis < 0 && trend === 'widening') { // 负基差扩大,建议减少期货空头或增加现货空头 suggestion = { symbol, adjustmentType: 'reduce_futures_short', suggestedAction: 'reduce_position', confidence: 0.8, reasoning: `负基差过大(${basis.toFixed(2)})且趋于扩大,建议减少期货空头敞口`, urgency: 'high', targetAdjustmentPercent: Math.min(0.3, Math.abs(basisPercent) / 100), } } } else if (riskLevel === 'medium') { suggestion = { symbol, adjustmentType: 'monitor_closely', suggestedAction: 'monitor', confidence: 0.6, reasoning: `基差风险中等,波动率${(volatility * 100).toFixed(2)}%,建议密切监控`, urgency: 'medium', } } return suggestion } /** * 开始监控基差 */ startMonitoring(): void { if (this.isMonitoring) return this.isMonitoring = true this.monitoringInterval = setInterval(() => { this.performRoutineChecks() }, this.alertConfig.monitoringIntervalMs) logger.info('基差监控已启动', { interval: this.alertConfig.monitoringIntervalMs, symbols: this.alertConfig.enabledSymbols, }) this.emit('monitoringStarted') } /** * 停止监控基差 */ stopMonitoring(): void { if (!this.isMonitoring) return this.isMonitoring = false if (this.monitoringInterval) { clearInterval(this.monitoringInterval) this.monitoringInterval = undefined } logger.info('基差监控已停止') this.emit('monitoringStopped') } /** * 执行例行检查 */ private performRoutineChecks(): void { for (const symbol of this.alertConfig.enabledSymbols) { const snapshot = this.getBasisSnapshot(symbol) if (!snapshot) continue const stats = this.getBasisStats(symbol) if (!stats) continue const riskLevel = this.assessBasisRisk(symbol, snapshot, stats) if (riskLevel !== 'normal') { const suggestion = this.calculateAdjustmentSuggestion(symbol) this.emit('routineAlert', { symbol, riskLevel, snapshot, stats, suggestion, }) } } // 清理过期数据 this.cleanupOldData() } /** * 更新基差历史数据 */ private updateBasisHistory(symbol: string, snapshot: BasisSnapshot): void { if (!this.basisHistory.has(symbol)) { this.basisHistory.set(symbol, []) } const history = this.basisHistory.get(symbol)! const dataPoint: BasisDataPoint = { spotPrice: snapshot.spotPrice, futuresPrice: snapshot.futuresPrice, basis: snapshot.basis, basisPercent: snapshot.basisPercent, timestamp: snapshot.timestamp, } history.push(dataPoint) // 限制历史数据长度 const maxPoints = (this.alertConfig.historyRetentionHours * 3600 * 1000) / this.alertConfig.monitoringIntervalMs if (history.length > maxPoints) { history.splice(0, history.length - maxPoints) } } /** * 计算基差统计信息 */ private calculateBasisStats(symbol: string): BasisStats | null { const history = this.basisHistory.get(symbol) if (!history || history.length < 2) return null const basisValues = history.map(h => h.basis) const recentValues = history.slice(-10).map(h => h.basis) // 最近10个点 // 计算均值 const mean = basisValues.reduce((sum, val) => sum + val, 0) / basisValues.length // 计算标准差 const variance = basisValues.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / basisValues.length const standardDeviation = Math.sqrt(variance) // 计算波动率(基于最近数据) let volatility = 0 if (recentValues.length >= 2) { const returns = [] for (let i = 1; i < recentValues.length; i++) { const previousValue = recentValues[i - 1] if (previousValue !== 0) { returns.push((recentValues[i] - previousValue) / Math.abs(previousValue)) } } if (returns.length > 0) { const meanReturn = returns.reduce((sum, ret) => sum + ret, 0) / returns.length const returnVariance = returns.reduce((sum, ret) => sum + Math.pow(ret - meanReturn, 2), 0) / returns.length volatility = Math.sqrt(returnVariance) } } // 判断趋势 let trend: 'widening' | 'narrowing' | 'stable' = 'stable' if (recentValues.length >= 5) { const firstHalf = recentValues.slice(0, Math.floor(recentValues.length / 2)) const secondHalf = recentValues.slice(Math.floor(recentValues.length / 2)) const firstMean = firstHalf.reduce((sum, val) => sum + Math.abs(val), 0) / firstHalf.length const secondMean = secondHalf.reduce((sum, val) => sum + Math.abs(val), 0) / secondHalf.length const trendThreshold = standardDeviation * 0.5 if (secondMean - firstMean > trendThreshold) { trend = 'widening' } else if (firstMean - secondMean > trendThreshold) { trend = 'narrowing' } } return { mean, standardDeviation, volatility, min: Math.min(...basisValues), max: Math.max(...basisValues), trend, dataPoints: basisValues.length, } } /** * 生成预警消息 */ private generateAlertMessage( symbol: string, riskLevel: BasisRiskLevel, snapshot: BasisSnapshot, stats: BasisStats, ): string { const { basis, basisPercent } = snapshot const { trend, volatility } = stats switch (riskLevel) { case 'high': return `${symbol} 基差风险高:${basis > 0 ? '正' : '负'}基差 ${Math.abs(basis).toFixed(2)} (${Math.abs( basisPercent, ).toFixed(2)}%),趋势${trend === 'widening' ? '扩大' : trend === 'narrowing' ? '收窄' : '稳定'}` case 'medium': return `${symbol} 基差风险中等:基差 ${basis.toFixed(2)},波动率 ${(volatility * 100).toFixed(2)}%` default: return `${symbol} 基差正常` } } /** * 清理过期数据 */ private cleanupOldData(): void { const cutoffTime = Date.now() - this.alertConfig.historyRetentionHours * 3600 * 1000 for (const [symbol, history] of this.basisHistory) { const validData = history.filter(point => point.timestamp > cutoffTime) this.basisHistory.set(symbol, validData) } } /** * 获取监控状态 */ getMonitoringStatus(): BasisMonitoringStatus { return { isActive: this.isMonitoring, monitoredSymbols: this.alertConfig.enabledSymbols, alertConfig: this.alertConfig, dataPointCounts: new Map( Array.from(this.basisHistory.entries()).map(([symbol, history]) => [symbol, history.length]), ), } } } // 类型定义 export interface BasisSnapshot { symbol: string spotPrice: number futuresPrice: number basis: number // futuresPrice - spotPrice basisPercent: number // (basis / spotPrice) * 100 timestamp: number } export interface BasisDataPoint { spotPrice: number futuresPrice: number basis: number basisPercent: number timestamp: number } export interface BasisStats { mean: number // 平均基差 standardDeviation: number // 标准差 volatility: number // 波动率 min: number // 最小基差 max: number // 最大基差 trend: 'widening' | 'narrowing' | 'stable' // 趋势 dataPoints: number // 数据点数量 } export interface BasisAlertConfig { maxBasisDeviation: number // 最大基差偏差阈值 (USD) basisVolatilityThreshold: number // 基差波动率阈值 historyRetentionHours: number // 历史数据保留小时数 monitoringIntervalMs: number // 监控间隔毫秒 enabledSymbols: string[] // 启用监控的交易对 } export type BasisRiskLevel = 'normal' | 'medium' | 'high' export interface BasisAdjustmentSuggestion { symbol: string adjustmentType: 'none' | 'reduce_futures_long' | 'reduce_futures_short' | 'increase_spot_hedge' | 'monitor_closely' suggestedAction: 'hold' | 'reduce_position' | 'increase_hedge' | 'monitor' confidence: number // 0-1 reasoning: string urgency: 'low' | 'medium' | 'high' targetAdjustmentPercent?: number // 建议调整比例 } export interface BasisMonitoringStatus { isActive: boolean monitoredSymbols: string[] alertConfig: BasisAlertConfig dataPointCounts: Map } export const basisManager = new BasisManager()