t031_market_data_failover_demo.ts 17 KB


  1. /**
  2. * T031 演示:市场数据失效回退桥接服务
  3. * 演示如何集成现有的 PacificaOrderbookManager 与新的 MarketDataFailoverService
  4. */
  5. import { logger } from '../src/utils/logger.js'
  6. // 模拟现有的 PacificaOrderbookManager
  7. class MockPacificaOrderbookManager {
  8. private isRunning = false
  9. private orderbookStats: any[] = []
  10. async initialize(): Promise<void> {
  11. logger.info('模拟 PacificaOrderbookManager 初始化')
  12. }
  13. async start(): Promise<void> {
  14. logger.info('模拟 PacificaOrderbookManager 启动')
  15. this.isRunning = true
  16. }
  17. async stop(): Promise<void> {
  18. logger.info('模拟 PacificaOrderbookManager 停止')
  19. this.isRunning = false
  20. }
  21. getStatus(): any {
  22. return {
  23. name: 'PacificaOrderbookManager',
  24. status: this.isRunning ? 'running' : 'stopped',
  25. lastUpdate: Date.now(),
  26. details: {
  27. isConnected: this.isRunning,
  28. cachedSymbols: ['BTC', 'ETH', 'SOL'],
  29. totalCacheSize: 3
  30. }
  31. }
  32. }
  33. getOrderbookStats(): any[] {
  34. // 模拟返回订单簿统计数据
  35. const now = Date.now()
  36. return [
  37. {
  38. symbol: 'BTC',
  39. spread: 100,
  40. spreadPercent: 0.001,
  41. midPrice: 108000,
  42. bestBid: 107950,
  43. bestAsk: 108050,
  44. totalBidSize: 1.5,
  45. totalAskSize: 1.2,
  46. depthLevels: 10,
  47. updateFrequency: 2.0,
  48. lastUpdate: now - 5000 // 5秒前更新
  49. },
  50. {
  51. symbol: 'ETH',
  52. spread: 20,
  53. spreadPercent: 0.002,
  54. midPrice: 3200,
  55. bestBid: 3190,
  56. bestAsk: 3210,
  57. totalBidSize: 10.0,
  58. totalAskSize: 8.5,
  59. depthLevels: 8,
  60. updateFrequency: 1.8,
  61. lastUpdate: now - 3000 // 3秒前更新
  62. }
  63. ]
  64. }
  65. // 模拟触发错误事件
  66. triggerError(error: Error): void {
  67. logger.warn('模拟订单簿管理器错误')
  68. this.emit('error', error)
  69. }
  70. // 添加事件发射器功能
  71. private listeners = new Map<string, Function[]>()
  72. on(event: string, listener: Function): void {
  73. if (!this.listeners.has(event)) {
  74. this.listeners.set(event, [])
  75. }
  76. this.listeners.get(event)!.push(listener)
  77. }
  78. emit(event: string, ...args: any[]): void {
  79. const eventListeners = this.listeners.get(event)
  80. if (eventListeners) {
  81. eventListeners.forEach(listener => listener(...args))
  82. }
  83. }
  84. }
  85. // 模拟 MarketDataFailoverService
  86. class MockMarketDataFailoverService {
  87. async triggerFailover(request: any): Promise<any> {
  88. logger.info(`触发失效回退: ${request.triggerReason}`)
  89. // 模拟失效回退过程
  90. await new Promise(resolve => setTimeout(resolve, 200))
  91. return {
  92. success: true,
  93. newDataSource: 'backup',
  94. failoverDuration: 200,
  95. originalDataSource: 'primary',
  96. triggerReason: request.triggerReason
  97. }
  98. }
  99. getFailoverStats(): any {
  100. return {
  101. totalFailovers: 5,
  102. successfulFailovers: 4,
  103. failedFailovers: 1,
  104. averageFailoverTime: 150,
  105. lastFailoverTime: Date.now() - 30000
  106. }
  107. }
  108. }
  109. // 模拟其他管理器
  110. class MockMarketDataFeedManager {
  111. private feeds = new Map<string, any>()
  112. addFeed(feed: any): void {
  113. this.feeds.set(feed.feedId, feed)
  114. logger.info(`添加市场数据源: ${feed.feedId}`)
  115. }
  116. getAllFeeds(): any[] {
  117. return Array.from(this.feeds.values())
  118. }
  119. }
  120. class MockSynthPriceSnapshotManager {
  121. private snapshots = new Map<string, any>()
  122. addSnapshot(snapshot: any): void {
  123. this.snapshots.set(snapshot.snapshotId, snapshot)
  124. logger.info(`添加合成价格快照: ${snapshot.symbol}`)
  125. }
  126. getAllSnapshots(): any[] {
  127. return Array.from(this.snapshots.values())
  128. }
  129. }
  130. class MockMonitoringEventManager {
  131. private events: any[] = []
  132. addEvent(event: any): void {
  133. this.events.push(event)
  134. logger.info(`添加监控事件: ${event.type}`)
  135. }
  136. getAllEvents(): any[] {
  137. return this.events
  138. }
  139. }
  140. // 简化的市场数据失效回退桥接器实现
  141. class SimpleMarketDataFailoverBridge {
  142. private isRunning = false
  143. private healthCheckInterval?: NodeJS.Timeout
  144. private currentDataSource: 'primary' | 'backup' | 'synthetic' = 'primary'
  145. private failoverStats = {
  146. totalFailovers: 0,
  147. successfulFailovers: 0,
  148. failedFailovers: 0,
  149. lastFailoverTime: 0,
  150. lastError: null as string | null
  151. }
  152. constructor(
  153. private pacificaOrderbookManager: MockPacificaOrderbookManager,
  154. private marketDataFailoverService: MockMarketDataFailoverService,
  155. private marketDataFeedManager: MockMarketDataFeedManager,
  156. private synthPriceSnapshotManager: MockSynthPriceSnapshotManager,
  157. private monitoringManager: MockMonitoringEventManager
  158. ) {}
  159. async start(): Promise<void> {
  160. logger.info('启动简化市场数据失效回退桥接器')
  161. try {
  162. await this.initializeMarketDataFeeds()
  163. this.startHealthCheck()
  164. this.setupFailoverMonitoring()
  165. this.isRunning = true
  166. logger.info('简化市场数据失效回退桥接器启动成功')
  167. } catch (error: any) {
  168. logger.error(`启动失败: ${error.message}`)
  169. throw error
  170. }
  171. }
  172. async stop(): Promise<void> {
  173. logger.info('停止简化市场数据失效回退桥接器')
  174. if (this.healthCheckInterval) {
  175. clearInterval(this.healthCheckInterval)
  176. }
  177. this.isRunning = false
  178. }
  179. private async initializeMarketDataFeeds(): Promise<void> {
  180. logger.info('初始化市场数据源')
  181. // 创建主要数据源
  182. const primaryFeed = {
  183. feedId: 'pacifica-ws-primary',
  184. exchange: 'pacifica',
  185. symbol: 'BTC',
  186. type: 'websocket',
  187. wsEndpoint: 'wss://ws.pacifica.fi/ws',
  188. httpEndpoint: 'https://api.pacifica.fi/api/v1',
  189. status: 'active',
  190. priority: 1,
  191. latency: 50,
  192. reliability: 0.95,
  193. lastUpdate: new Date(),
  194. createdAt: new Date(),
  195. updatedAt: new Date()
  196. }
  197. this.marketDataFeedManager.addFeed(primaryFeed)
  198. // 创建备用数据源
  199. const backupFeed = {
  200. feedId: 'pacifica-http-backup',
  201. exchange: 'pacifica',
  202. symbol: 'BTC',
  203. type: 'http',
  204. wsEndpoint: '',
  205. httpEndpoint: 'https://api.pacifica.fi/api/v1',
  206. status: 'standby',
  207. priority: 2,
  208. latency: 200,
  209. reliability: 0.90,
  210. lastUpdate: new Date(),
  211. createdAt: new Date(),
  212. updatedAt: new Date()
  213. }
  214. this.marketDataFeedManager.addFeed(backupFeed)
  215. // 初始化合成价格快照
  216. await this.initializeSynthPriceSnapshots()
  217. logger.info('市场数据源初始化完成')
  218. }
  219. private async initializeSynthPriceSnapshots(): Promise<void> {
  220. logger.info('初始化合成价格快照')
  221. const symbols = ['BTC', 'ETH', 'SOL']
  222. for (const symbol of symbols) {
  223. const synthSnapshot = {
  224. snapshotId: `synth-${symbol.toLowerCase()}-${Date.now()}`,
  225. symbol,
  226. source: 'synthetic',
  227. price: this.generateSyntheticPrice(symbol),
  228. confidence: 0.8,
  229. factors: {
  230. historicalAverage: this.getHistoricalAverage(symbol),
  231. volatility: this.getVolatility(symbol),
  232. marketTrend: this.getMarketTrend(symbol)
  233. },
  234. createdAt: new Date(),
  235. updatedAt: new Date()
  236. }
  237. this.synthPriceSnapshotManager.addSnapshot(synthSnapshot)
  238. }
  239. logger.info('合成价格快照初始化完成')
  240. }
  241. private startHealthCheck(): void {
  242. logger.info('启动市场数据健康检查')
  243. this.healthCheckInterval = setInterval(async () => {
  244. try {
  245. await this.performHealthCheck()
  246. } catch (error: any) {
  247. logger.error(`健康检查失败: ${error.message}`)
  248. }
  249. }, 5000) // 5秒检查一次
  250. // 立即执行一次健康检查
  251. this.performHealthCheck().catch(error => {
  252. logger.error(`初始健康检查失败: ${error.message}`)
  253. })
  254. }
  255. private async performHealthCheck(): Promise<void> {
  256. if (!this.isRunning) return
  257. logger.debug('执行市场数据健康检查')
  258. try {
  259. const primaryFeedStatus = this.pacificaOrderbookManager.getStatus()
  260. if (primaryFeedStatus.status !== 'running') {
  261. logger.warn('主要数据源状态异常,检查是否需要失效回退')
  262. await this.checkFailoverNeeded('primary_feed_unhealthy')
  263. }
  264. const dataQuality = this.assessDataQuality()
  265. if (dataQuality === 'poor') {
  266. logger.warn('数据质量较差,触发失效回退')
  267. await this.triggerFailover('data_quality_poor')
  268. }
  269. // 添加监控事件
  270. this.monitoringManager.addEvent({
  271. eventId: `health-check-${Date.now()}`,
  272. type: 'market-data-health-check',
  273. payload: {
  274. primaryFeedStatus: primaryFeedStatus.status,
  275. dataQuality,
  276. currentDataSource: this.currentDataSource,
  277. failoverStats: this.failoverStats
  278. },
  279. severity: dataQuality === 'poor' ? 'WARN' : 'INFO',
  280. createdAt: new Date()
  281. })
  282. } catch (error: any) {
  283. logger.error(`健康检查执行失败: ${error.message}`)
  284. }
  285. }
  286. private async checkFailoverNeeded(reason: string): Promise<void> {
  287. try {
  288. const failoverResult = await this.marketDataFailoverService.triggerFailover({
  289. triggerReason: reason,
  290. timeoutMs: 10000,
  291. maxAttempts: 3
  292. })
  293. if (failoverResult.success) {
  294. this.currentDataSource = failoverResult.newDataSource as any
  295. this.failoverStats.successfulFailovers++
  296. this.failoverStats.lastFailoverTime = Date.now()
  297. logger.info(`失效回退成功,切换到: ${this.currentDataSource}`)
  298. } else {
  299. this.failoverStats.failedFailovers++
  300. this.failoverStats.lastError = failoverResult.error
  301. logger.error(`失效回退失败: ${failoverResult.error}`)
  302. }
  303. this.failoverStats.totalFailovers++
  304. } catch (error: any) {
  305. this.failoverStats.failedFailovers++
  306. this.failoverStats.lastError = error.message
  307. logger.error(`检查失效回退需求失败: ${error.message}`)
  308. }
  309. }
  310. async triggerFailover(reason: string): Promise<{
  311. success: boolean
  312. message: string
  313. newDataSource?: string
  314. }> {
  315. try {
  316. const failoverResult = await this.marketDataFailoverService.triggerFailover({
  317. triggerReason: reason,
  318. timeoutMs: 10000,
  319. maxAttempts: 3
  320. })
  321. if (failoverResult.success) {
  322. this.currentDataSource = failoverResult.newDataSource as any
  323. this.failoverStats.successfulFailovers++
  324. this.failoverStats.lastFailoverTime = Date.now()
  325. return {
  326. success: true,
  327. message: `失效回退成功,切换到: ${this.currentDataSource}`,
  328. newDataSource: this.currentDataSource
  329. }
  330. } else {
  331. this.failoverStats.failedFailovers++
  332. this.failoverStats.lastError = failoverResult.error
  333. return {
  334. success: false,
  335. message: `失效回退失败: ${failoverResult.error}`
  336. }
  337. }
  338. } catch (error: any) {
  339. this.failoverStats.failedFailovers++
  340. this.failoverStats.lastError = error.message
  341. return {
  342. success: false,
  343. message: `失效回退异常: ${error.message}`
  344. }
  345. }
  346. }
  347. private assessDataQuality(): 'excellent' | 'good' | 'degraded' | 'poor' {
  348. try {
  349. const orderbookStats = this.pacificaOrderbookManager.getOrderbookStats()
  350. if (orderbookStats.length === 0) {
  351. return 'poor'
  352. }
  353. const now = Date.now()
  354. const maxAge = 30000 // 30秒
  355. const staleData = orderbookStats.some(stat => (now - stat.lastUpdate) > maxAge)
  356. if (staleData) {
  357. return 'degraded'
  358. }
  359. const highSpread = orderbookStats.some(stat => stat.spreadPercent > 0.01)
  360. if (highSpread) {
  361. return 'degraded'
  362. }
  363. const lowFrequency = orderbookStats.some(stat => stat.updateFrequency < 0.5)
  364. if (lowFrequency) {
  365. return 'good'
  366. }
  367. return 'excellent'
  368. } catch (error: any) {
  369. logger.error(`评估数据质量失败: ${error.message}`)
  370. return 'poor'
  371. }
  372. }
  373. private setupFailoverMonitoring(): void {
  374. logger.info('设置失效回退监控')
  375. // 监听订单簿管理器的错误事件
  376. this.pacificaOrderbookManager.on('error', async (error) => {
  377. logger.warn('订单簿管理器错误,检查失效回退', { error: error.message })
  378. await this.checkFailoverNeeded('orderbook_manager_error')
  379. })
  380. // 创建失效回退启动事件
  381. this.monitoringManager.addEvent({
  382. eventId: `failover-bridge-start-${Date.now()}`,
  383. type: 'failover-bridge-start',
  384. payload: {
  385. primaryFeeds: 1,
  386. backupFeeds: 1,
  387. synthSnapshots: 3
  388. },
  389. severity: 'INFO',
  390. createdAt: new Date()
  391. })
  392. logger.info('失效回退监控设置完成')
  393. }
  394. private generateSyntheticPrice(symbol: string): number {
  395. const basePrice = this.getHistoricalAverage(symbol)
  396. const volatility = this.getVolatility(symbol)
  397. const randomFactor = (Math.random() - 0.5) * 2 * volatility
  398. return basePrice * (1 + randomFactor)
  399. }
  400. private getHistoricalAverage(symbol: string): number {
  401. const averages: Record<string, number> = {
  402. 'BTC': 108000,
  403. 'ETH': 3200,
  404. 'SOL': 180
  405. }
  406. return averages[symbol] || 1000
  407. }
  408. private getVolatility(symbol: string): number {
  409. const volatilities: Record<string, number> = {
  410. 'BTC': 0.02,
  411. 'ETH': 0.03,
  412. 'SOL': 0.04
  413. }
  414. return volatilities[symbol] || 0.02
  415. }
  416. private getMarketTrend(symbol: string): 'bullish' | 'bearish' | 'neutral' {
  417. const trends = ['bullish', 'bearish', 'neutral'] as const
  418. return trends[Math.floor(Math.random() * trends.length)]
  419. }
  420. getBridgeStats(): any {
  421. return {
  422. primaryFeeds: 1,
  423. backupFeeds: 1,
  424. synthPriceSnapshots: this.synthPriceSnapshotManager.getAllSnapshots().length,
  425. failoverEvents: this.failoverStats.totalFailovers,
  426. lastFailoverTime: this.failoverStats.lastFailoverTime,
  427. currentDataSource: this.currentDataSource,
  428. dataQuality: this.assessDataQuality(),
  429. bridgeMode: true
  430. }
  431. }
  432. }
  433. async function main() {
  434. logger.info('=== T031 市场数据失效回退桥接演示 ===')
  435. try {
  436. // 1. 创建模拟组件
  437. const mockOrderbookManager = new MockPacificaOrderbookManager()
  438. const mockFailoverService = new MockMarketDataFailoverService()
  439. const mockFeedManager = new MockMarketDataFeedManager()
  440. const mockSynthSnapshotManager = new MockSynthPriceSnapshotManager()
  441. const mockMonitoringManager = new MockMonitoringEventManager()
  442. // 2. 初始化订单簿管理器
  443. await mockOrderbookManager.initialize()
  444. await mockOrderbookManager.start()
  445. // 3. 创建市场数据失效回退桥接器
  446. const marketDataBridge = new SimpleMarketDataFailoverBridge(
  447. mockOrderbookManager,
  448. mockFailoverService,
  449. mockFeedManager,
  450. mockSynthSnapshotManager,
  451. mockMonitoringManager
  452. )
  453. // 4. 启动市场数据桥接器
  454. logger.info('启动市场数据失效回退桥接器...')
  455. await marketDataBridge.start()
  456. // 5. 检查初始状态
  457. const initialStats = marketDataBridge.getBridgeStats()
  458. logger.info('初始市场数据桥接统计:', initialStats)
  459. // 6. 执行手动失效回退测试
  460. logger.info('执行手动失效回退测试...')
  461. const failoverResult = await marketDataBridge.triggerFailover('manual_test')
  462. logger.info('失效回退结果:', failoverResult)
  463. // 7. 模拟数据源错误
  464. logger.info('模拟主要数据源错误...')
  465. mockOrderbookManager.triggerError(new Error('模拟WebSocket连接错误'))
  466. // 8. 等待健康检查处理
  467. await new Promise(resolve => setTimeout(resolve, 6000))
  468. // 9. 检查失效回退后状态
  469. const finalStats = marketDataBridge.getBridgeStats()
  470. logger.info('最终市场数据桥接统计:', finalStats)
  471. // 10. 验证数据一致性
  472. logger.info('验证数据一致性...')
  473. const marketDataFeeds = mockFeedManager.getAllFeeds()
  474. const synthSnapshots = mockSynthSnapshotManager.getAllSnapshots()
  475. const monitoringEvents = mockMonitoringManager.getAllEvents()
  476. logger.info(`验证结果:`)
  477. logger.info(`- 市场数据源: ${marketDataFeeds.length}`)
  478. logger.info(`- 合成价格快照: ${synthSnapshots.length}`)
  479. logger.info(`- 监控事件: ${monitoringEvents.length}`)
  480. logger.info(`- 当前数据源: ${finalStats.currentDataSource}`)
  481. logger.info(`- 数据质量: ${finalStats.dataQuality}`)
  482. logger.info(`- 失效回退事件: ${finalStats.failoverEvents}`)
  483. logger.info(`- 桥接模式: ${finalStats.bridgeMode ? '✅' : '❌'}`)
  484. logger.info(`- 数据一致性: ${marketDataFeeds.length > 0 && synthSnapshots.length > 0 ? '✅' : '❌'}`)
  485. // 11. 停止桥接器
  486. await marketDataBridge.stop()
  487. logger.info('=== T031 市场数据失效回退桥接演示完成 ===')
  488. } catch (error: any) {
  489. logger.error('T031 演示失败:', { error: error.message, stack: error.stack })
  490. process.exit(1)
  491. }
  492. }
  493. // 运行演示
  494. if (import.meta.url === `file://${process.argv[1]}`) {
  495. main().catch(error => {
  496. logger.error('演示运行失败:', error)
  497. process.exit(1)
  498. })
  499. }