/** * T031 演示:市场数据失效回退桥接服务 * 演示如何集成现有的 PacificaOrderbookManager 与新的 MarketDataFailoverService */ import { logger } from '../src/utils/logger.js' // 模拟现有的 PacificaOrderbookManager class MockPacificaOrderbookManager { private isRunning = false private orderbookStats: any[] = [] async initialize(): Promise { logger.info('模拟 PacificaOrderbookManager 初始化') } async start(): Promise { logger.info('模拟 PacificaOrderbookManager 启动') this.isRunning = true } async stop(): Promise { logger.info('模拟 PacificaOrderbookManager 停止') this.isRunning = false } getStatus(): any { return { name: 'PacificaOrderbookManager', status: this.isRunning ? 'running' : 'stopped', lastUpdate: Date.now(), details: { isConnected: this.isRunning, cachedSymbols: ['BTC', 'ETH', 'SOL'], totalCacheSize: 3 } } } getOrderbookStats(): any[] { // 模拟返回订单簿统计数据 const now = Date.now() return [ { symbol: 'BTC', spread: 100, spreadPercent: 0.001, midPrice: 108000, bestBid: 107950, bestAsk: 108050, totalBidSize: 1.5, totalAskSize: 1.2, depthLevels: 10, updateFrequency: 2.0, lastUpdate: now - 5000 // 5秒前更新 }, { symbol: 'ETH', spread: 20, spreadPercent: 0.002, midPrice: 3200, bestBid: 3190, bestAsk: 3210, totalBidSize: 10.0, totalAskSize: 8.5, depthLevels: 8, updateFrequency: 1.8, lastUpdate: now - 3000 // 3秒前更新 } ] } // 模拟触发错误事件 triggerError(error: Error): void { logger.warn('模拟订单簿管理器错误') this.emit('error', error) } // 添加事件发射器功能 private listeners = new Map() on(event: string, listener: Function): void { if (!this.listeners.has(event)) { this.listeners.set(event, []) } this.listeners.get(event)!.push(listener) } emit(event: string, ...args: any[]): void { const eventListeners = this.listeners.get(event) if (eventListeners) { eventListeners.forEach(listener => listener(...args)) } } } // 模拟 MarketDataFailoverService class MockMarketDataFailoverService { async triggerFailover(request: any): Promise { logger.info(`触发失效回退: ${request.triggerReason}`) // 模拟失效回退过程 await new Promise(resolve => setTimeout(resolve, 200)) return { success: true, newDataSource: 'backup', failoverDuration: 200, originalDataSource: 'primary', triggerReason: request.triggerReason } } getFailoverStats(): any { return { totalFailovers: 5, successfulFailovers: 4, failedFailovers: 1, averageFailoverTime: 150, lastFailoverTime: Date.now() - 30000 } } } // 模拟其他管理器 class MockMarketDataFeedManager { private feeds = new Map() addFeed(feed: any): void { this.feeds.set(feed.feedId, feed) logger.info(`添加市场数据源: ${feed.feedId}`) } getAllFeeds(): any[] { return Array.from(this.feeds.values()) } } class MockSynthPriceSnapshotManager { private snapshots = new Map() addSnapshot(snapshot: any): void { this.snapshots.set(snapshot.snapshotId, snapshot) logger.info(`添加合成价格快照: ${snapshot.symbol}`) } getAllSnapshots(): any[] { return Array.from(this.snapshots.values()) } } class MockMonitoringEventManager { private events: any[] = [] addEvent(event: any): void { this.events.push(event) logger.info(`添加监控事件: ${event.type}`) } getAllEvents(): any[] { return this.events } } // 简化的市场数据失效回退桥接器实现 class SimpleMarketDataFailoverBridge { private isRunning = false private healthCheckInterval?: NodeJS.Timeout private currentDataSource: 'primary' | 'backup' | 'synthetic' = 'primary' private failoverStats = { totalFailovers: 0, successfulFailovers: 0, failedFailovers: 0, lastFailoverTime: 0, lastError: null as string | null } constructor( private pacificaOrderbookManager: MockPacificaOrderbookManager, private marketDataFailoverService: MockMarketDataFailoverService, private marketDataFeedManager: MockMarketDataFeedManager, private synthPriceSnapshotManager: MockSynthPriceSnapshotManager, private monitoringManager: MockMonitoringEventManager ) {} async start(): Promise { logger.info('启动简化市场数据失效回退桥接器') try { await this.initializeMarketDataFeeds() this.startHealthCheck() this.setupFailoverMonitoring() this.isRunning = true logger.info('简化市场数据失效回退桥接器启动成功') } catch (error: any) { logger.error(`启动失败: ${error.message}`) throw error } } async stop(): Promise { logger.info('停止简化市场数据失效回退桥接器') if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval) } this.isRunning = false } private async initializeMarketDataFeeds(): Promise { logger.info('初始化市场数据源') // 创建主要数据源 const primaryFeed = { feedId: 'pacifica-ws-primary', exchange: 'pacifica', symbol: 'BTC', type: 'websocket', wsEndpoint: 'wss://ws.pacifica.fi/ws', httpEndpoint: 'https://api.pacifica.fi/api/v1', status: 'active', priority: 1, latency: 50, reliability: 0.95, lastUpdate: new Date(), createdAt: new Date(), updatedAt: new Date() } this.marketDataFeedManager.addFeed(primaryFeed) // 创建备用数据源 const backupFeed = { feedId: 'pacifica-http-backup', exchange: 'pacifica', symbol: 'BTC', type: 'http', wsEndpoint: '', httpEndpoint: 'https://api.pacifica.fi/api/v1', status: 'standby', priority: 2, latency: 200, reliability: 0.90, lastUpdate: new Date(), createdAt: new Date(), updatedAt: new Date() } this.marketDataFeedManager.addFeed(backupFeed) // 初始化合成价格快照 await this.initializeSynthPriceSnapshots() logger.info('市场数据源初始化完成') } private async initializeSynthPriceSnapshots(): Promise { logger.info('初始化合成价格快照') const symbols = ['BTC', 'ETH', 'SOL'] for (const symbol of symbols) { const synthSnapshot = { snapshotId: `synth-${symbol.toLowerCase()}-${Date.now()}`, symbol, source: 'synthetic', price: this.generateSyntheticPrice(symbol), confidence: 0.8, factors: { historicalAverage: this.getHistoricalAverage(symbol), volatility: this.getVolatility(symbol), marketTrend: this.getMarketTrend(symbol) }, createdAt: new Date(), updatedAt: new Date() } this.synthPriceSnapshotManager.addSnapshot(synthSnapshot) } logger.info('合成价格快照初始化完成') } private startHealthCheck(): void { logger.info('启动市场数据健康检查') this.healthCheckInterval = setInterval(async () => { try { await this.performHealthCheck() } catch (error: any) { logger.error(`健康检查失败: ${error.message}`) } }, 5000) // 5秒检查一次 // 立即执行一次健康检查 this.performHealthCheck().catch(error => { logger.error(`初始健康检查失败: ${error.message}`) }) } private async performHealthCheck(): Promise { if (!this.isRunning) return logger.debug('执行市场数据健康检查') try { const primaryFeedStatus = this.pacificaOrderbookManager.getStatus() if (primaryFeedStatus.status !== 'running') { logger.warn('主要数据源状态异常,检查是否需要失效回退') await this.checkFailoverNeeded('primary_feed_unhealthy') } const dataQuality = this.assessDataQuality() if (dataQuality === 'poor') { logger.warn('数据质量较差,触发失效回退') await this.triggerFailover('data_quality_poor') } // 添加监控事件 this.monitoringManager.addEvent({ eventId: `health-check-${Date.now()}`, type: 'market-data-health-check', payload: { primaryFeedStatus: primaryFeedStatus.status, dataQuality, currentDataSource: this.currentDataSource, failoverStats: this.failoverStats }, severity: dataQuality === 'poor' ? 'WARN' : 'INFO', createdAt: new Date() }) } catch (error: any) { logger.error(`健康检查执行失败: ${error.message}`) } } private async checkFailoverNeeded(reason: string): Promise { try { const failoverResult = await this.marketDataFailoverService.triggerFailover({ triggerReason: reason, timeoutMs: 10000, maxAttempts: 3 }) if (failoverResult.success) { this.currentDataSource = failoverResult.newDataSource as any this.failoverStats.successfulFailovers++ this.failoverStats.lastFailoverTime = Date.now() logger.info(`失效回退成功,切换到: ${this.currentDataSource}`) } else { this.failoverStats.failedFailovers++ this.failoverStats.lastError = failoverResult.error logger.error(`失效回退失败: ${failoverResult.error}`) } this.failoverStats.totalFailovers++ } catch (error: any) { this.failoverStats.failedFailovers++ this.failoverStats.lastError = error.message logger.error(`检查失效回退需求失败: ${error.message}`) } } async triggerFailover(reason: string): Promise<{ success: boolean message: string newDataSource?: string }> { try { const failoverResult = await this.marketDataFailoverService.triggerFailover({ triggerReason: reason, timeoutMs: 10000, maxAttempts: 3 }) if (failoverResult.success) { this.currentDataSource = failoverResult.newDataSource as any this.failoverStats.successfulFailovers++ this.failoverStats.lastFailoverTime = Date.now() return { success: true, message: `失效回退成功,切换到: ${this.currentDataSource}`, newDataSource: this.currentDataSource } } else { this.failoverStats.failedFailovers++ this.failoverStats.lastError = failoverResult.error return { success: false, message: `失效回退失败: ${failoverResult.error}` } } } catch (error: any) { this.failoverStats.failedFailovers++ this.failoverStats.lastError = error.message return { success: false, message: `失效回退异常: ${error.message}` } } } private assessDataQuality(): 'excellent' | 'good' | 'degraded' | 'poor' { try { const orderbookStats = this.pacificaOrderbookManager.getOrderbookStats() if (orderbookStats.length === 0) { return 'poor' } const now = Date.now() const maxAge = 30000 // 30秒 const staleData = orderbookStats.some(stat => (now - stat.lastUpdate) > maxAge) if (staleData) { return 'degraded' } const highSpread = orderbookStats.some(stat => stat.spreadPercent > 0.01) if (highSpread) { return 'degraded' } const lowFrequency = orderbookStats.some(stat => stat.updateFrequency < 0.5) if (lowFrequency) { return 'good' } return 'excellent' } catch (error: any) { logger.error(`评估数据质量失败: ${error.message}`) return 'poor' } } private setupFailoverMonitoring(): void { logger.info('设置失效回退监控') // 监听订单簿管理器的错误事件 this.pacificaOrderbookManager.on('error', async (error) => { logger.warn('订单簿管理器错误,检查失效回退', { error: error.message }) await this.checkFailoverNeeded('orderbook_manager_error') }) // 创建失效回退启动事件 this.monitoringManager.addEvent({ eventId: `failover-bridge-start-${Date.now()}`, type: 'failover-bridge-start', payload: { primaryFeeds: 1, backupFeeds: 1, synthSnapshots: 3 }, severity: 'INFO', createdAt: new Date() }) logger.info('失效回退监控设置完成') } private generateSyntheticPrice(symbol: string): number { const basePrice = this.getHistoricalAverage(symbol) const volatility = this.getVolatility(symbol) const randomFactor = (Math.random() - 0.5) * 2 * volatility return basePrice * (1 + randomFactor) } private getHistoricalAverage(symbol: string): number { const averages: Record = { 'BTC': 108000, 'ETH': 3200, 'SOL': 180 } return averages[symbol] || 1000 } private getVolatility(symbol: string): number { const volatilities: Record = { 'BTC': 0.02, 'ETH': 0.03, 'SOL': 0.04 } return volatilities[symbol] || 0.02 } private getMarketTrend(symbol: string): 'bullish' | 'bearish' | 'neutral' { const trends = ['bullish', 'bearish', 'neutral'] as const return trends[Math.floor(Math.random() * trends.length)] } getBridgeStats(): any { return { primaryFeeds: 1, backupFeeds: 1, synthPriceSnapshots: this.synthPriceSnapshotManager.getAllSnapshots().length, failoverEvents: this.failoverStats.totalFailovers, lastFailoverTime: this.failoverStats.lastFailoverTime, currentDataSource: this.currentDataSource, dataQuality: this.assessDataQuality(), bridgeMode: true } } } async function main() { logger.info('=== T031 市场数据失效回退桥接演示 ===') try { // 1. 创建模拟组件 const mockOrderbookManager = new MockPacificaOrderbookManager() const mockFailoverService = new MockMarketDataFailoverService() const mockFeedManager = new MockMarketDataFeedManager() const mockSynthSnapshotManager = new MockSynthPriceSnapshotManager() const mockMonitoringManager = new MockMonitoringEventManager() // 2. 初始化订单簿管理器 await mockOrderbookManager.initialize() await mockOrderbookManager.start() // 3. 创建市场数据失效回退桥接器 const marketDataBridge = new SimpleMarketDataFailoverBridge( mockOrderbookManager, mockFailoverService, mockFeedManager, mockSynthSnapshotManager, mockMonitoringManager ) // 4. 启动市场数据桥接器 logger.info('启动市场数据失效回退桥接器...') await marketDataBridge.start() // 5. 检查初始状态 const initialStats = marketDataBridge.getBridgeStats() logger.info('初始市场数据桥接统计:', initialStats) // 6. 执行手动失效回退测试 logger.info('执行手动失效回退测试...') const failoverResult = await marketDataBridge.triggerFailover('manual_test') logger.info('失效回退结果:', failoverResult) // 7. 模拟数据源错误 logger.info('模拟主要数据源错误...') mockOrderbookManager.triggerError(new Error('模拟WebSocket连接错误')) // 8. 等待健康检查处理 await new Promise(resolve => setTimeout(resolve, 6000)) // 9. 检查失效回退后状态 const finalStats = marketDataBridge.getBridgeStats() logger.info('最终市场数据桥接统计:', finalStats) // 10. 验证数据一致性 logger.info('验证数据一致性...') const marketDataFeeds = mockFeedManager.getAllFeeds() const synthSnapshots = mockSynthSnapshotManager.getAllSnapshots() const monitoringEvents = mockMonitoringManager.getAllEvents() logger.info(`验证结果:`) logger.info(`- 市场数据源: ${marketDataFeeds.length}`) logger.info(`- 合成价格快照: ${synthSnapshots.length}`) logger.info(`- 监控事件: ${monitoringEvents.length}`) logger.info(`- 当前数据源: ${finalStats.currentDataSource}`) logger.info(`- 数据质量: ${finalStats.dataQuality}`) logger.info(`- 失效回退事件: ${finalStats.failoverEvents}`) logger.info(`- 桥接模式: ${finalStats.bridgeMode ? '✅' : '❌'}`) logger.info(`- 数据一致性: ${marketDataFeeds.length > 0 && synthSnapshots.length > 0 ? '✅' : '❌'}`) // 11. 停止桥接器 await marketDataBridge.stop() logger.info('=== T031 市场数据失效回退桥接演示完成 ===') } catch (error: any) { logger.error('T031 演示失败:', { error: error.message, stack: error.stack }) process.exit(1) } } // 运行演示 if (import.meta.url === `file://${process.argv[1]}`) { main().catch(error => { logger.error('演示运行失败:', error) process.exit(1) }) }