import { EventEmitter } from 'events' import { logger } from '../../utils/logger.js' import { PacificaProxyClient } from '../../exchanges/pacifica/PacificaProxyClient.js' import { OrderCreatePayload } from '../../exchanges/pacifica/OrdersAdapter.js' /** * 价格收敛管理器 - 管理两个账户的开仓价格逐步接近,并自动设置止盈止损 */ export class PriceConvergenceManager extends EventEmitter { private accountPairs: Map = new Map() private clients: Map = new Map() private config: ConvergenceConfig private isActive: boolean = false private monitoringInterval?: NodeJS.Timeout constructor(config?: Partial) { super() this.config = { maxPriceDeviation: config?.maxPriceDeviation || 0.005, // 0.5% 最大价格偏差 convergenceStepPercent: config?.convergenceStepPercent || 0.001, // 0.1% 收敛步长 minOrderSize: config?.minOrderSize || 0.001, // 最小订单量 maxOrderSize: config?.maxOrderSize || 1.0, // 最大订单量 checkIntervalMs: config?.checkIntervalMs || 3000, // 3秒检查间隔 takeProfitPercent: config?.takeProfitPercent || 0.02, // 2% 止盈 stopLossPercent: config?.stopLossPercent || 0.01, // 1% 止损 maxDailyOrders: config?.maxDailyOrders || 50, // 每日最大订单数 enableTrailingStop: config?.enableTrailingStop || false, // 启用追踪止损 trailingStopPercent: config?.trailingStopPercent || 0.005, // 0.5% 追踪止损 } } /** * 添加账户对 */ addAccountPair(pairId: string, account1Config: AccountConfig, account2Config: AccountConfig, symbol: string): void { // 创建交易客户端 const client1 = new PacificaProxyClient({ account: account1Config.account, privateKey: account1Config.privateKey, agentWallet: account1Config.agentWallet, agentPrivateKey: account1Config.agentPrivateKey, }) const client2 = new PacificaProxyClient({ account: account2Config.account, privateKey: account2Config.privateKey, agentWallet: account2Config.agentWallet, agentPrivateKey: account2Config.agentPrivateKey, }) this.clients.set(account1Config.account, client1) this.clients.set(account2Config.account, client2) const accountPair: AccountPair = { id: pairId, account1: account1Config, account2: account2Config, symbol, account1Position: null, account2Position: null, targetPriceDeviation: 0, currentPriceDeviation: 0, lastUpdate: 0, dailyOrderCount: 0, lastOrderDate: '', isConverged: false, stopLossOrders: new Map(), takeProfitOrders: new Map(), } this.accountPairs.set(pairId, accountPair) logger.info('添加账户对', { pairId, account1: account1Config.account, account2: account2Config.account, symbol, }) } /** * 开始价格收敛监控 */ startConvergenceMonitoring(): void { if (this.isActive) return this.isActive = true this.monitoringInterval = setInterval(() => { this.performConvergenceCheck() }, this.config.checkIntervalMs) logger.info('价格收敛监控已启动', { interval: this.config.checkIntervalMs, pairs: Array.from(this.accountPairs.keys()), }) this.emit('convergenceMonitoringStarted') } /** * 停止价格收敛监控 */ stopConvergenceMonitoring(): void { if (!this.isActive) return this.isActive = false if (this.monitoringInterval) { clearInterval(this.monitoringInterval) this.monitoringInterval = undefined } logger.info('价格收敛监控已停止') this.emit('convergenceMonitoringStopped') } /** * 执行收敛检查 */ private async performConvergenceCheck(): Promise { for (const [pairId, pair] of this.accountPairs) { try { await this.updatePositions(pair) await this.checkAndExecuteConvergence(pair) } catch (error: any) { logger.error('收敛检查失败', { pairId, error: error.message, }) } } } /** * 更新账户仓位信息 */ private async updatePositions(pair: AccountPair): Promise { try { const client1 = this.clients.get(pair.account1.account)! const client2 = this.clients.get(pair.account2.account)! const [positions1, positions2] = await Promise.all([client1.getPositions(), client2.getPositions()]) // 查找目标交易对的仓位 pair.account1Position = this.findPositionBySymbol(positions1, pair.symbol) pair.account2Position = this.findPositionBySymbol(positions2, pair.symbol) pair.lastUpdate = Date.now() // 计算当前价格偏差 if (pair.account1Position && pair.account2Position) { const price1 = pair.account1Position.averagePrice || pair.account1Position.entryPrice const price2 = pair.account2Position.averagePrice || pair.account2Position.entryPrice if (price1 && price2) { pair.currentPriceDeviation = Math.abs(price1 - price2) / ((price1 + price2) / 2) pair.isConverged = pair.currentPriceDeviation <= this.config.maxPriceDeviation } } logger.debug('更新仓位信息', { pairId: pair.id, account1Position: pair.account1Position ? 'Found' : 'None', account2Position: pair.account2Position ? 'Found' : 'None', currentDeviation: pair.currentPriceDeviation.toFixed(4), isConverged: pair.isConverged, }) } catch (error: any) { logger.error('更新仓位信息失败', { pairId: pair.id, error: error.message, }) } } /** * 检查并执行收敛操作 */ private async checkAndExecuteConvergence(pair: AccountPair): Promise { // 检查是否需要收敛 if (pair.isConverged) { return // 已收敛,无需操作 } // 检查每日订单限制 const today = new Date().toDateString() if (pair.lastOrderDate !== today) { pair.dailyOrderCount = 0 pair.lastOrderDate = today } if (pair.dailyOrderCount >= this.config.maxDailyOrders) { logger.warn('达到每日订单限制', { pairId: pair.id, dailyCount: pair.dailyOrderCount, maxDaily: this.config.maxDailyOrders, }) return } // 执行收敛交易 await this.executeConvergenceTrade(pair) } /** * 执行收敛交易 */ private async executeConvergenceTrade(pair: AccountPair): Promise { try { // 获取当前市场价格 const marketPrice = await this.getCurrentMarketPrice(pair.symbol) if (!marketPrice) { logger.warn('无法获取市场价格', { symbol: pair.symbol }) return } // 计算收敛交易参数 const tradeParams = this.calculateConvergenceTradeParams(pair, marketPrice) if (!tradeParams) { logger.debug('无需收敛交易', { pairId: pair.id }) return } // 执行交易 const results = await this.executeTrades(pair, tradeParams, marketPrice) // 设置止盈止损 if (results.some(r => r.success)) { await this.setStopLossAndTakeProfit(pair, marketPrice) } // 更新统计 pair.dailyOrderCount += results.filter(r => r.success).length logger.info('收敛交易执行完成', { pairId: pair.id, successful: results.filter(r => r.success).length, failed: results.filter(r => !r.success).length, marketPrice, }) this.emit('convergenceTradeExecuted', { pairId: pair.id, results, tradeParams, marketPrice, }) } catch (error: any) { logger.error('收敛交易执行失败', { pairId: pair.id, error: error.message, }) } } /** * 计算收敛交易参数 */ private calculateConvergenceTradeParams(pair: AccountPair, marketPrice: number): ConvergenceTradeParams | null { const pos1 = pair.account1Position const pos2 = pair.account2Position // 如果都没有仓位,创建初始仓位 if (!pos1 && !pos2) { const orderSize = this.config.minOrderSize return { account1Trade: { action: 'open_long', size: orderSize, expectedPrice: marketPrice * (1 - this.config.convergenceStepPercent), }, account2Trade: { action: 'open_short', size: orderSize, expectedPrice: marketPrice * (1 + this.config.convergenceStepPercent), }, convergenceType: 'initial_hedge', } } // 如果价格偏差过大,执行收敛交易 if (pair.currentPriceDeviation > this.config.maxPriceDeviation) { if (pos1 && pos2) { const price1 = pos1.averagePrice || pos1.entryPrice const price2 = pos2.averagePrice || pos2.entryPrice if (!price1 || !price2) return null // 计算收敛方向:价格高的账户减仓,价格低的账户加仓 const adjustmentSize = Math.min( this.config.maxOrderSize, Math.max(this.config.minOrderSize, Math.abs(pos1.size - pos2.size) * 0.1), ) if (price1 > price2) { return { account1Trade: { action: pos1.size > 0 ? 'reduce_long' : 'reduce_short', size: adjustmentSize, expectedPrice: marketPrice, }, account2Trade: { action: pos2.size > 0 ? 'increase_long' : 'increase_short', size: adjustmentSize, expectedPrice: marketPrice, }, convergenceType: 'price_adjustment', } } else { return { account1Trade: { action: pos1.size > 0 ? 'increase_long' : 'increase_short', size: adjustmentSize, expectedPrice: marketPrice, }, account2Trade: { action: pos2.size > 0 ? 'reduce_long' : 'reduce_short', size: adjustmentSize, expectedPrice: marketPrice, }, convergenceType: 'price_adjustment', } } } } return null } /** * 执行交易 */ private async executeTrades( pair: AccountPair, tradeParams: ConvergenceTradeParams, marketPrice: number, ): Promise { const results: TradeResult[] = [] // 执行账户1的交易 if (tradeParams.account1Trade) { try { const result = await this.executeAccountTrade( pair.account1, tradeParams.account1Trade, pair.symbol, marketPrice, ) results.push({ account: pair.account1.account, ...result }) } catch (error: any) { results.push({ account: pair.account1.account, success: false, error: error.message, }) } } // 执行账户2的交易 if (tradeParams.account2Trade) { try { const result = await this.executeAccountTrade( pair.account2, tradeParams.account2Trade, pair.symbol, marketPrice, ) results.push({ account: pair.account2.account, ...result }) } catch (error: any) { results.push({ account: pair.account2.account, success: false, error: error.message, }) } } return results } /** * 执行单个账户的交易 */ private async executeAccountTrade( account: AccountConfig, trade: TradeAction, symbol: string, marketPrice: number, ): Promise> { const client = this.clients.get(account.account)! let side: 'bid' | 'ask' let reduceOnly = false switch (trade.action) { case 'open_long': case 'increase_long': side = 'bid' break case 'open_short': case 'increase_short': side = 'ask' break case 'reduce_long': side = 'ask' reduceOnly = true break case 'reduce_short': side = 'bid' reduceOnly = true break default: throw new Error(`不支持的交易动作: ${trade.action}`) } const payload: OrderCreatePayload = { account: account.account, symbol, amount: trade.size.toString(), side, reduceOnly, slippagePercent: '0.5', } logger.info('执行账户交易', { account: account.account, action: trade.action, size: trade.size, side, reduceOnly, expectedPrice: trade.expectedPrice, marketPrice, }) const result = await client.createMarketOrder(payload) return { success: result.success || false, orderId: result.orderId || result.order_id, executedPrice: marketPrice, // 市价单,使用市场价格 executedSize: trade.size, } } /** * 设置止盈止损 */ private async setStopLossAndTakeProfit(pair: AccountPair, marketPrice: number): Promise { try { // 为每个账户设置止盈止损 await Promise.all([ this.setAccountStopOrders(pair.account1, pair.symbol, marketPrice), this.setAccountStopOrders(pair.account2, pair.symbol, marketPrice), ]) logger.info('止盈止损设置完成', { pairId: pair.id, marketPrice, takeProfitPercent: this.config.takeProfitPercent, stopLossPercent: this.config.stopLossPercent, }) } catch (error: any) { logger.error('设置止盈止损失败', { pairId: pair.id, error: error.message, }) } } /** * 为单个账户设置止盈止损单 */ private async setAccountStopOrders(account: AccountConfig, symbol: string, marketPrice: number): Promise { const client = this.clients.get(account.account)! // 获取当前仓位 const positions = await client.getPositions() const position = this.findPositionBySymbol(positions, symbol) if (!position || position.size === 0) return const isLong = position.size > 0 const positionSize = Math.abs(position.size) // 计算止盈止损价格 const takeProfitPrice = isLong ? marketPrice * (1 + this.config.takeProfitPercent) : marketPrice * (1 - this.config.takeProfitPercent) const stopLossPrice = isLong ? marketPrice * (1 - this.config.stopLossPercent) : marketPrice * (1 + this.config.stopLossPercent) // 设置止盈单 try { const takeProfitPayload: OrderCreatePayload = { account: account.account, symbol, amount: positionSize.toString(), side: isLong ? 'ask' : 'bid', reduceOnly: true, orderType: 'limit', price: takeProfitPrice.toString(), slippagePercent: '0.1', } const takeProfitResult = await client.createLimitOrder(takeProfitPayload) logger.info('止盈单设置成功', { account: account.account, orderId: takeProfitResult.orderId, price: takeProfitPrice, size: positionSize, }) } catch (error: any) { logger.error('设置止盈单失败', { account: account.account, error: error.message, }) } // 设置止损单 try { const stopLossPayload: OrderCreatePayload = { account: account.account, symbol, amount: positionSize.toString(), side: isLong ? 'ask' : 'bid', reduceOnly: true, orderType: 'stop_market', stopPrice: stopLossPrice.toString(), slippagePercent: '1.0', // 止损允许更大滑点 } const stopLossResult = await client.createStopOrder(stopLossPayload) logger.info('止损单设置成功', { account: account.account, orderId: stopLossResult.orderId, stopPrice: stopLossPrice, size: positionSize, }) } catch (error: any) { logger.error('设置止损单失败', { account: account.account, error: error.message, }) } } /** * 获取当前市场价格 */ private async getCurrentMarketPrice(symbol: string): Promise { try { // 使用第一个可用的客户端获取价格 const client = this.clients.values().next().value as PacificaProxyClient if (!client) return null const ticker = await client.getTicker(symbol) return ticker?.price || null } catch (error: any) { logger.error('获取市场价格失败', { symbol, error: error.message, }) return null } } /** * 根据交易对查找仓位 */ private findPositionBySymbol(positions: any[], symbol: string): Position | null { const position = positions.find(p => p.symbol === symbol || p.market === symbol) if (!position) return null return { symbol: position.symbol || position.market, size: parseFloat(position.size || position.amount || '0'), side: position.side || (parseFloat(position.size || position.amount || '0') > 0 ? 'long' : 'short'), entryPrice: parseFloat(position.entryPrice || position.avgPrice || '0'), averagePrice: parseFloat(position.averagePrice || position.avgPrice || position.entryPrice || '0'), markPrice: parseFloat(position.markPrice || position.lastPrice || '0'), unrealizedPnl: parseFloat(position.unrealizedPnl || position.pnl || '0'), } } /** * 获取账户对状态 */ getAccountPairStatus(pairId: string): AccountPairStatus | null { const pair = this.accountPairs.get(pairId) if (!pair) return null return { pairId: pair.id, symbol: pair.symbol, account1: pair.account1.account, account2: pair.account2.account, currentPriceDeviation: pair.currentPriceDeviation, maxAllowedDeviation: this.config.maxPriceDeviation, isConverged: pair.isConverged, dailyOrderCount: pair.dailyOrderCount, lastUpdate: pair.lastUpdate, account1Position: pair.account1Position, account2Position: pair.account2Position, } } /** * 获取所有账户对状态 */ getAllAccountPairStatuses(): AccountPairStatus[] { return Array.from(this.accountPairs.keys()) .map(pairId => this.getAccountPairStatus(pairId)!) .filter(Boolean) } } // 类型定义 export interface AccountConfig { account: string privateKey: string agentWallet?: string agentPrivateKey?: string } export interface AccountPair { id: string account1: AccountConfig account2: AccountConfig symbol: string account1Position: Position | null account2Position: Position | null targetPriceDeviation: number currentPriceDeviation: number lastUpdate: number dailyOrderCount: number lastOrderDate: string isConverged: boolean stopLossOrders: Map // accountId -> orderId takeProfitOrders: Map // accountId -> orderId } export interface Position { symbol: string size: number side: 'long' | 'short' entryPrice: number averagePrice: number markPrice: number unrealizedPnl: number } export interface ConvergenceConfig { maxPriceDeviation: number // 最大允许价格偏差 convergenceStepPercent: number // 收敛步长百分比 minOrderSize: number // 最小订单量 maxOrderSize: number // 最大订单量 checkIntervalMs: number // 检查间隔 takeProfitPercent: number // 止盈百分比 stopLossPercent: number // 止损百分比 maxDailyOrders: number // 每日最大订单数 enableTrailingStop: boolean // 启用追踪止损 trailingStopPercent: number // 追踪止损百分比 } export interface TradeAction { action: 'open_long' | 'open_short' | 'increase_long' | 'increase_short' | 'reduce_long' | 'reduce_short' size: number expectedPrice: number } export interface ConvergenceTradeParams { account1Trade?: TradeAction account2Trade?: TradeAction convergenceType: 'initial_hedge' | 'price_adjustment' | 'rebalance' } export interface TradeResult { account: string success: boolean orderId?: string executedPrice?: number executedSize?: number error?: string } export interface AccountPairStatus { pairId: string symbol: string account1: string account2: string currentPriceDeviation: number maxAllowedDeviation: number isConverged: boolean dailyOrderCount: number lastUpdate: number account1Position: Position | null account2Position: Position | null } export const priceConvergenceManager = new PriceConvergenceManager()