| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593 |
- /**
- * T031 演示:市场数据失效回退桥接服务
- * 演示如何集成现有的 PacificaOrderbookManager 与新的 MarketDataFailoverService
- */
- import { logger } from '../src/utils/logger.js'
- // 模拟现有的 PacificaOrderbookManager
- class MockPacificaOrderbookManager {
- private isRunning = false
- private orderbookStats: any[] = []
- async initialize(): Promise<void> {
- logger.info('模拟 PacificaOrderbookManager 初始化')
- }
- async start(): Promise<void> {
- logger.info('模拟 PacificaOrderbookManager 启动')
- this.isRunning = true
- }
- async stop(): Promise<void> {
- logger.info('模拟 PacificaOrderbookManager 停止')
- this.isRunning = false
- }
- getStatus(): any {
- return {
- name: 'PacificaOrderbookManager',
- status: this.isRunning ? 'running' : 'stopped',
- lastUpdate: Date.now(),
- details: {
- isConnected: this.isRunning,
- cachedSymbols: ['BTC', 'ETH', 'SOL'],
- totalCacheSize: 3
- }
- }
- }
- getOrderbookStats(): any[] {
- // 模拟返回订单簿统计数据
- const now = Date.now()
- return [
- {
- symbol: 'BTC',
- spread: 100,
- spreadPercent: 0.001,
- midPrice: 108000,
- bestBid: 107950,
- bestAsk: 108050,
- totalBidSize: 1.5,
- totalAskSize: 1.2,
- depthLevels: 10,
- updateFrequency: 2.0,
- lastUpdate: now - 5000 // 5秒前更新
- },
- {
- symbol: 'ETH',
- spread: 20,
- spreadPercent: 0.002,
- midPrice: 3200,
- bestBid: 3190,
- bestAsk: 3210,
- totalBidSize: 10.0,
- totalAskSize: 8.5,
- depthLevels: 8,
- updateFrequency: 1.8,
- lastUpdate: now - 3000 // 3秒前更新
- }
- ]
- }
- // 模拟触发错误事件
- triggerError(error: Error): void {
- logger.warn('模拟订单簿管理器错误')
- this.emit('error', error)
- }
- // 添加事件发射器功能
- private listeners = new Map<string, Function[]>()
- on(event: string, listener: Function): void {
- if (!this.listeners.has(event)) {
- this.listeners.set(event, [])
- }
- this.listeners.get(event)!.push(listener)
- }
- emit(event: string, ...args: any[]): void {
- const eventListeners = this.listeners.get(event)
- if (eventListeners) {
- eventListeners.forEach(listener => listener(...args))
- }
- }
- }
- // 模拟 MarketDataFailoverService
- class MockMarketDataFailoverService {
- async triggerFailover(request: any): Promise<any> {
- logger.info(`触发失效回退: ${request.triggerReason}`)
-
- // 模拟失效回退过程
- await new Promise(resolve => setTimeout(resolve, 200))
-
- return {
- success: true,
- newDataSource: 'backup',
- failoverDuration: 200,
- originalDataSource: 'primary',
- triggerReason: request.triggerReason
- }
- }
- getFailoverStats(): any {
- return {
- totalFailovers: 5,
- successfulFailovers: 4,
- failedFailovers: 1,
- averageFailoverTime: 150,
- lastFailoverTime: Date.now() - 30000
- }
- }
- }
- // 模拟其他管理器
- class MockMarketDataFeedManager {
- private feeds = new Map<string, any>()
- addFeed(feed: any): void {
- this.feeds.set(feed.feedId, feed)
- logger.info(`添加市场数据源: ${feed.feedId}`)
- }
- getAllFeeds(): any[] {
- return Array.from(this.feeds.values())
- }
- }
- class MockSynthPriceSnapshotManager {
- private snapshots = new Map<string, any>()
- addSnapshot(snapshot: any): void {
- this.snapshots.set(snapshot.snapshotId, snapshot)
- logger.info(`添加合成价格快照: ${snapshot.symbol}`)
- }
- getAllSnapshots(): any[] {
- return Array.from(this.snapshots.values())
- }
- }
- class MockMonitoringEventManager {
- private events: any[] = []
- addEvent(event: any): void {
- this.events.push(event)
- logger.info(`添加监控事件: ${event.type}`)
- }
- getAllEvents(): any[] {
- return this.events
- }
- }
- // 简化的市场数据失效回退桥接器实现
- class SimpleMarketDataFailoverBridge {
- private isRunning = false
- private healthCheckInterval?: NodeJS.Timeout
- private currentDataSource: 'primary' | 'backup' | 'synthetic' = 'primary'
- private failoverStats = {
- totalFailovers: 0,
- successfulFailovers: 0,
- failedFailovers: 0,
- lastFailoverTime: 0,
- lastError: null as string | null
- }
- constructor(
- private pacificaOrderbookManager: MockPacificaOrderbookManager,
- private marketDataFailoverService: MockMarketDataFailoverService,
- private marketDataFeedManager: MockMarketDataFeedManager,
- private synthPriceSnapshotManager: MockSynthPriceSnapshotManager,
- private monitoringManager: MockMonitoringEventManager
- ) {}
- async start(): Promise<void> {
- logger.info('启动简化市场数据失效回退桥接器')
-
- try {
- await this.initializeMarketDataFeeds()
- this.startHealthCheck()
- this.setupFailoverMonitoring()
- this.isRunning = true
- logger.info('简化市场数据失效回退桥接器启动成功')
- } catch (error: any) {
- logger.error(`启动失败: ${error.message}`)
- throw error
- }
- }
- async stop(): Promise<void> {
- logger.info('停止简化市场数据失效回退桥接器')
- if (this.healthCheckInterval) {
- clearInterval(this.healthCheckInterval)
- }
- this.isRunning = false
- }
- private async initializeMarketDataFeeds(): Promise<void> {
- logger.info('初始化市场数据源')
- // 创建主要数据源
- const primaryFeed = {
- feedId: 'pacifica-ws-primary',
- exchange: 'pacifica',
- symbol: 'BTC',
- type: 'websocket',
- wsEndpoint: 'wss://ws.pacifica.fi/ws',
- httpEndpoint: 'https://api.pacifica.fi/api/v1',
- status: 'active',
- priority: 1,
- latency: 50,
- reliability: 0.95,
- lastUpdate: new Date(),
- createdAt: new Date(),
- updatedAt: new Date()
- }
- this.marketDataFeedManager.addFeed(primaryFeed)
- // 创建备用数据源
- const backupFeed = {
- feedId: 'pacifica-http-backup',
- exchange: 'pacifica',
- symbol: 'BTC',
- type: 'http',
- wsEndpoint: '',
- httpEndpoint: 'https://api.pacifica.fi/api/v1',
- status: 'standby',
- priority: 2,
- latency: 200,
- reliability: 0.90,
- lastUpdate: new Date(),
- createdAt: new Date(),
- updatedAt: new Date()
- }
- this.marketDataFeedManager.addFeed(backupFeed)
- // 初始化合成价格快照
- await this.initializeSynthPriceSnapshots()
- logger.info('市场数据源初始化完成')
- }
- private async initializeSynthPriceSnapshots(): Promise<void> {
- logger.info('初始化合成价格快照')
- const symbols = ['BTC', 'ETH', 'SOL']
-
- for (const symbol of symbols) {
- const synthSnapshot = {
- snapshotId: `synth-${symbol.toLowerCase()}-${Date.now()}`,
- symbol,
- source: 'synthetic',
- price: this.generateSyntheticPrice(symbol),
- confidence: 0.8,
- factors: {
- historicalAverage: this.getHistoricalAverage(symbol),
- volatility: this.getVolatility(symbol),
- marketTrend: this.getMarketTrend(symbol)
- },
- createdAt: new Date(),
- updatedAt: new Date()
- }
- this.synthPriceSnapshotManager.addSnapshot(synthSnapshot)
- }
- logger.info('合成价格快照初始化完成')
- }
- private startHealthCheck(): void {
- logger.info('启动市场数据健康检查')
-
- this.healthCheckInterval = setInterval(async () => {
- try {
- await this.performHealthCheck()
- } catch (error: any) {
- logger.error(`健康检查失败: ${error.message}`)
- }
- }, 5000) // 5秒检查一次
- // 立即执行一次健康检查
- this.performHealthCheck().catch(error => {
- logger.error(`初始健康检查失败: ${error.message}`)
- })
- }
- private async performHealthCheck(): Promise<void> {
- if (!this.isRunning) return
- logger.debug('执行市场数据健康检查')
- try {
- const primaryFeedStatus = this.pacificaOrderbookManager.getStatus()
-
- if (primaryFeedStatus.status !== 'running') {
- logger.warn('主要数据源状态异常,检查是否需要失效回退')
- await this.checkFailoverNeeded('primary_feed_unhealthy')
- }
- const dataQuality = this.assessDataQuality()
-
- if (dataQuality === 'poor') {
- logger.warn('数据质量较差,触发失效回退')
- await this.triggerFailover('data_quality_poor')
- }
- // 添加监控事件
- this.monitoringManager.addEvent({
- eventId: `health-check-${Date.now()}`,
- type: 'market-data-health-check',
- payload: {
- primaryFeedStatus: primaryFeedStatus.status,
- dataQuality,
- currentDataSource: this.currentDataSource,
- failoverStats: this.failoverStats
- },
- severity: dataQuality === 'poor' ? 'WARN' : 'INFO',
- createdAt: new Date()
- })
- } catch (error: any) {
- logger.error(`健康检查执行失败: ${error.message}`)
- }
- }
- private async checkFailoverNeeded(reason: string): Promise<void> {
- try {
- const failoverResult = await this.marketDataFailoverService.triggerFailover({
- triggerReason: reason,
- timeoutMs: 10000,
- maxAttempts: 3
- })
- if (failoverResult.success) {
- this.currentDataSource = failoverResult.newDataSource as any
- this.failoverStats.successfulFailovers++
- this.failoverStats.lastFailoverTime = Date.now()
- logger.info(`失效回退成功,切换到: ${this.currentDataSource}`)
- } else {
- this.failoverStats.failedFailovers++
- this.failoverStats.lastError = failoverResult.error
- logger.error(`失效回退失败: ${failoverResult.error}`)
- }
- this.failoverStats.totalFailovers++
- } catch (error: any) {
- this.failoverStats.failedFailovers++
- this.failoverStats.lastError = error.message
- logger.error(`检查失效回退需求失败: ${error.message}`)
- }
- }
- async triggerFailover(reason: string): Promise<{
- success: boolean
- message: string
- newDataSource?: string
- }> {
- try {
- const failoverResult = await this.marketDataFailoverService.triggerFailover({
- triggerReason: reason,
- timeoutMs: 10000,
- maxAttempts: 3
- })
- if (failoverResult.success) {
- this.currentDataSource = failoverResult.newDataSource as any
- this.failoverStats.successfulFailovers++
- this.failoverStats.lastFailoverTime = Date.now()
-
- return {
- success: true,
- message: `失效回退成功,切换到: ${this.currentDataSource}`,
- newDataSource: this.currentDataSource
- }
- } else {
- this.failoverStats.failedFailovers++
- this.failoverStats.lastError = failoverResult.error
-
- return {
- success: false,
- message: `失效回退失败: ${failoverResult.error}`
- }
- }
- } catch (error: any) {
- this.failoverStats.failedFailovers++
- this.failoverStats.lastError = error.message
-
- return {
- success: false,
- message: `失效回退异常: ${error.message}`
- }
- }
- }
- private assessDataQuality(): 'excellent' | 'good' | 'degraded' | 'poor' {
- try {
- const orderbookStats = this.pacificaOrderbookManager.getOrderbookStats()
-
- if (orderbookStats.length === 0) {
- return 'poor'
- }
- const now = Date.now()
- const maxAge = 30000 // 30秒
- const staleData = orderbookStats.some(stat => (now - stat.lastUpdate) > maxAge)
-
- if (staleData) {
- return 'degraded'
- }
- const highSpread = orderbookStats.some(stat => stat.spreadPercent > 0.01)
-
- if (highSpread) {
- return 'degraded'
- }
- const lowFrequency = orderbookStats.some(stat => stat.updateFrequency < 0.5)
-
- if (lowFrequency) {
- return 'good'
- }
- return 'excellent'
- } catch (error: any) {
- logger.error(`评估数据质量失败: ${error.message}`)
- return 'poor'
- }
- }
- private setupFailoverMonitoring(): void {
- logger.info('设置失效回退监控')
- // 监听订单簿管理器的错误事件
- this.pacificaOrderbookManager.on('error', async (error) => {
- logger.warn('订单簿管理器错误,检查失效回退', { error: error.message })
- await this.checkFailoverNeeded('orderbook_manager_error')
- })
- // 创建失效回退启动事件
- this.monitoringManager.addEvent({
- eventId: `failover-bridge-start-${Date.now()}`,
- type: 'failover-bridge-start',
- payload: {
- primaryFeeds: 1,
- backupFeeds: 1,
- synthSnapshots: 3
- },
- severity: 'INFO',
- createdAt: new Date()
- })
- logger.info('失效回退监控设置完成')
- }
- private generateSyntheticPrice(symbol: string): number {
- const basePrice = this.getHistoricalAverage(symbol)
- const volatility = this.getVolatility(symbol)
- const randomFactor = (Math.random() - 0.5) * 2 * volatility
-
- return basePrice * (1 + randomFactor)
- }
- private getHistoricalAverage(symbol: string): number {
- const averages: Record<string, number> = {
- 'BTC': 108000,
- 'ETH': 3200,
- 'SOL': 180
- }
- return averages[symbol] || 1000
- }
- private getVolatility(symbol: string): number {
- const volatilities: Record<string, number> = {
- 'BTC': 0.02,
- 'ETH': 0.03,
- 'SOL': 0.04
- }
- return volatilities[symbol] || 0.02
- }
- private getMarketTrend(symbol: string): 'bullish' | 'bearish' | 'neutral' {
- const trends = ['bullish', 'bearish', 'neutral'] as const
- return trends[Math.floor(Math.random() * trends.length)]
- }
- getBridgeStats(): any {
- return {
- primaryFeeds: 1,
- backupFeeds: 1,
- synthPriceSnapshots: this.synthPriceSnapshotManager.getAllSnapshots().length,
- failoverEvents: this.failoverStats.totalFailovers,
- lastFailoverTime: this.failoverStats.lastFailoverTime,
- currentDataSource: this.currentDataSource,
- dataQuality: this.assessDataQuality(),
- bridgeMode: true
- }
- }
- }
- async function main() {
- logger.info('=== T031 市场数据失效回退桥接演示 ===')
- try {
- // 1. 创建模拟组件
- const mockOrderbookManager = new MockPacificaOrderbookManager()
- const mockFailoverService = new MockMarketDataFailoverService()
- const mockFeedManager = new MockMarketDataFeedManager()
- const mockSynthSnapshotManager = new MockSynthPriceSnapshotManager()
- const mockMonitoringManager = new MockMonitoringEventManager()
- // 2. 初始化订单簿管理器
- await mockOrderbookManager.initialize()
- await mockOrderbookManager.start()
- // 3. 创建市场数据失效回退桥接器
- const marketDataBridge = new SimpleMarketDataFailoverBridge(
- mockOrderbookManager,
- mockFailoverService,
- mockFeedManager,
- mockSynthSnapshotManager,
- mockMonitoringManager
- )
- // 4. 启动市场数据桥接器
- logger.info('启动市场数据失效回退桥接器...')
- await marketDataBridge.start()
- // 5. 检查初始状态
- const initialStats = marketDataBridge.getBridgeStats()
- logger.info('初始市场数据桥接统计:', initialStats)
- // 6. 执行手动失效回退测试
- logger.info('执行手动失效回退测试...')
- const failoverResult = await marketDataBridge.triggerFailover('manual_test')
- logger.info('失效回退结果:', failoverResult)
- // 7. 模拟数据源错误
- logger.info('模拟主要数据源错误...')
- mockOrderbookManager.triggerError(new Error('模拟WebSocket连接错误'))
- // 8. 等待健康检查处理
- await new Promise(resolve => setTimeout(resolve, 6000))
- // 9. 检查失效回退后状态
- const finalStats = marketDataBridge.getBridgeStats()
- logger.info('最终市场数据桥接统计:', finalStats)
- // 10. 验证数据一致性
- logger.info('验证数据一致性...')
- const marketDataFeeds = mockFeedManager.getAllFeeds()
- const synthSnapshots = mockSynthSnapshotManager.getAllSnapshots()
- const monitoringEvents = mockMonitoringManager.getAllEvents()
- logger.info(`验证结果:`)
- logger.info(`- 市场数据源: ${marketDataFeeds.length}`)
- logger.info(`- 合成价格快照: ${synthSnapshots.length}`)
- logger.info(`- 监控事件: ${monitoringEvents.length}`)
- logger.info(`- 当前数据源: ${finalStats.currentDataSource}`)
- logger.info(`- 数据质量: ${finalStats.dataQuality}`)
- logger.info(`- 失效回退事件: ${finalStats.failoverEvents}`)
- logger.info(`- 桥接模式: ${finalStats.bridgeMode ? '✅' : '❌'}`)
- logger.info(`- 数据一致性: ${marketDataFeeds.length > 0 && synthSnapshots.length > 0 ? '✅' : '❌'}`)
- // 11. 停止桥接器
- await marketDataBridge.stop()
- logger.info('=== T031 市场数据失效回退桥接演示完成 ===')
- } catch (error: any) {
- logger.error('T031 演示失败:', { error: error.message, stack: error.stack })
- process.exit(1)
- }
- }
- // 运行演示
- if (import.meta.url === `file://${process.argv[1]}`) {
- main().catch(error => {
- logger.error('演示运行失败:', error)
- process.exit(1)
- })
- }
|