| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969 |
- import 'dotenv/config';
- import { readFileSync, mkdirSync } from "node:fs";
- import { dirname, join, resolve } from "node:path";
- import { parse } from "yaml";
- import pino from "pino";
- import { ShadowBook } from "../../../packages/utils/src/shadowBook";
- import { MarketDataAdapter } from "../../../packages/utils/src/marketDataAdapter";
- import { AdapterRegistry } from "../../../packages/connectors/pacifica/src/adapterRegistry";
- import { OrderRouter } from "../../../packages/execution/src/orderRouter";
- import { GlobalOrderCoordinator } from "../../../packages/execution/src/globalOrderCoordinator";
- import { MarketMaker } from "../../../packages/strategies/src/marketMaker";
- import { MicroScalper } from "../../../packages/strategies/src/microScalper";
- import { GridMaker, type AdaptiveGridConfig } from "../../../packages/strategies/src/gridMaker";
- import { HedgeEngine } from "../../../packages/hedge/src/hedgeEngine";
- import { PositionManager } from "../../../packages/portfolio/src/positionManager";
- import { RiskEngine } from "../../../packages/risk/src/riskEngine";
- import { PacificaWebSocket } from "../../../packages/connectors/pacifica/src/wsClient";
- import { PacificaWsOrderGateway } from "../../../packages/connectors/pacifica/src/wsOrderGateway";
- import { RateLimiter } from "../../../packages/connectors/pacifica/src/rateLimiter";
- import type { Order, Fill } from "../../../packages/domain/src/types";
- import type { KillSwitchConfig } from "../../../packages/risk/src/riskEngine";
- let globalFileDestination: pino.DestinationStream | undefined;
- const flushLogsSafe = () => {
- if (!globalFileDestination) return;
- const stream: any = globalFileDestination;
- if (typeof stream.flushSync !== 'function') return;
- try {
- stream.flushSync();
- } catch {
- // 静默忽略 flush 失败(sonic-boom 未 ready 时正常)
- }
- };
- const logger = createRunnerLogger();
- const cfg = parse(readFileSync("config/config.yaml","utf8"));
- const baseUrl = cfg.api_base || process.env.PACIFICA_API_BASE || "https://api.pacifica.fi/api/v1";
- type AccountConfigEntry = {
- api_base?: string;
- address?: string;
- private_key?: string;
- api_key?: string;
- secret?: string;
- subaccount?: string;
- role?: string;
- };
- type MarketMakerRawConfig = {
- tick_sz?: number;
- clip_sz?: number;
- spread_bps?: number;
- reprice_ms?: number;
- };
- type ScalperRawConfig = {
- clip_sz?: number;
- tp_bps?: number;
- sl_bps?: number;
- on_book_interval_ms?: number;
- trigger?: {
- spread_bps?: number;
- min_cooldown_ms?: number;
- };
- };
- const adapterRegistry = new AdapterRegistry();
- const accountsConfig: Record<string, AccountConfigEntry> = cfg.accounts || {};
- const accountEntries = Object.entries(accountsConfig);
- const wsGatewayConfigs: Array<{ id: string; address?: string; privateKey?: string }> = [];
- const addressToAccountId = new Map<string, string>();
- const fillHandlers = new Map<string, Array<(fill: Fill) => Promise<void>>>();
- let killSwitchActive = false;
- let gridMakerInstance: GridMaker | undefined;
- const baseClipEquityPct = cfg.grid?.base_clip_equity_pct ?? 0;
- const baseClipLeverage = cfg.grid?.base_clip_leverage ?? 1;
- const baseClipUsdFloor = cfg.grid?.base_clip_usd ?? 0;
- const wsRateLimiterConfig = cfg.execution?.ws_rate_limiter;
- let marketMakerInstance: MarketMaker | undefined;
- let scalperInstance: MicroScalper | undefined;
- let gridStatusTimer: NodeJS.Timeout | undefined;
- let gridTickTimer: NodeJS.Timeout | undefined;
- let marketMakerTimer: NodeJS.Timeout | undefined;
- let scalperTimer: NodeJS.Timeout | undefined;
- function registerFillHandler(accountId: string | undefined, handler: (fill: Fill) => Promise<void>): void {
- if (!accountId || killSwitchActive) return;
- const existing = fillHandlers.get(accountId);
- if (existing) {
- existing.push(handler);
- return;
- }
- fillHandlers.set(accountId, [handler]);
- }
- const fallbackAddress =
- process.env.PACIFICA_ACCOUNT_ADDRESS ||
- process.env.PACIFICA_API_KEY;
- const fallbackPrivateKey =
- process.env.PACIFICA_ACCOUNT_PRIVATE_KEY ||
- process.env.PACIFICA_API_SECRET;
- const fallbackSubaccount = process.env.PACIFICA_SUBACCOUNT || "main";
- if (accountEntries.length === 0) {
- adapterRegistry.register("maker", {
- baseUrl,
- apiKey: fallbackAddress,
- secret: fallbackPrivateKey,
- subaccount: fallbackSubaccount
- }, "maker");
- if (fallbackAddress && fallbackPrivateKey) {
- wsGatewayConfigs.push({ id: "maker", address: fallbackAddress, privateKey: fallbackPrivateKey });
- addressToAccountId.set(fallbackAddress, "maker");
- } else {
- logger.warn("WS gateway disabled for maker account: missing address/private key");
- }
- } else {
- for (const [id, value] of accountEntries) {
- if (typeof value !== "object" || value === null) continue;
- const upper = id.toUpperCase();
- const address =
- value.address ||
- value.api_key ||
- process.env[`${upper}_ADDRESS`] ||
- process.env[`${upper}_API_KEY`] ||
- fallbackAddress;
- const privateKey =
- value.private_key ||
- value.secret ||
- process.env[`${upper}_PRIVATE_KEY`] ||
- process.env[`${upper}_API_SECRET`] ||
- fallbackPrivateKey;
- const subaccount =
- value.subaccount ||
- process.env[`${upper}_SUBACCOUNT`] ||
- fallbackSubaccount;
- adapterRegistry.register(
- id,
- {
- baseUrl: value.api_base || baseUrl,
- apiKey: address,
- secret: privateKey,
- subaccount
- },
- value.role || id
- );
- if (address && privateKey) {
- wsGatewayConfigs.push({ id, address, privateKey });
- addressToAccountId.set(address, id);
- } else {
- logger.warn({ accountId: id }, "WS gateway disabled: missing address/private_key");
- }
- }
- }
- const makerEntry = adapterRegistry.findEntryByRole("maker") ?? adapterRegistry.list()[0];
- if (!makerEntry) {
- throw new Error("No Pacifica adapters configured");
- }
- const makerAccountId = makerEntry.id;
- const makerAdapter = makerEntry.adapter;
- const hedgerAccountId = adapterRegistry.findEntryByRole("hedger")?.id ?? makerAccountId;
- const shadow = new ShadowBook();
- async function main(){
- const strategyMode = cfg.strategy_mode || 'scalper';
- const primarySymbol = Array.isArray(cfg.symbols) && cfg.symbols.length ? cfg.symbols[0] : "BTC";
- const defaultGridSymbol = cfg.grid?.symbol || primarySymbol;
- const marketMakerCfg: MarketMakerRawConfig = cfg.mm || {};
- const scalperCfg: ScalperRawConfig = cfg.scalper || {
- trigger: { spread_bps: 2, min_cooldown_ms: 500 },
- tp_bps: 3,
- sl_bps: 6,
- clip_sz: 0.001
- };
- logger.info({ symbol: primarySymbol, baseUrl, strategyMode }, "runner start");
- const marketData = new MarketDataAdapter({
- symbols: [primarySymbol],
- shadowBook: shadow,
- fetchSnapshot: (symbol: string) => makerAdapter.getOrderBook(symbol),
- pollIntervalMs: cfg.market_data?.poll_interval_ms ?? 1000
- });
- await marketData.start();
- marketData.on('error', payload => {
- logger.warn({ symbol: payload.symbol, error: payload.error }, 'market data error');
- });
- const positionManager = new PositionManager();
- const globalCoordinator = new GlobalOrderCoordinator();
- const riskEngine = new RiskEngine(
- {
- maxBaseAbs: cfg.risk?.max_base_abs ?? 0,
- maxNotionalAbs: cfg.risk?.max_notional_abs ?? 0,
- maxOrderSz: cfg.risk?.max_order_sz ?? 0
- },
- mapKillSwitch(cfg.risk?.kill_switch)
- );
- const triggerKillSwitch = async (source?: string) => {
- if (killSwitchActive) return;
- killSwitchActive = true;
- const status = riskEngine.getStatus();
- const gridStatusSnapshot = gridMakerInstance?.getStatus();
- logger.error({ status, gridStatus: gridStatusSnapshot, source }, 'Kill-switch activated; halting strategies');
- const hookUrl = process.env.KILL_SWITCH_WEBHOOK;
- if (hookUrl && typeof fetch === 'function') {
- try {
- await fetch(hookUrl, {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({
- timestamp: new Date().toISOString(),
- source,
- status,
- gridStatus: gridStatusSnapshot
- })
- });
- } catch (error) {
- logger.warn({ error }, 'Failed to notify kill-switch webhook');
- }
- }
- if (gridStatusTimer) {
- clearInterval(gridStatusTimer);
- gridStatusTimer = undefined;
- }
- if (gridTickTimer) {
- clearInterval(gridTickTimer);
- gridTickTimer = undefined;
- }
- if (marketMakerTimer) {
- clearInterval(marketMakerTimer);
- marketMakerTimer = undefined;
- }
- if (scalperTimer) {
- clearInterval(scalperTimer);
- scalperTimer = undefined;
- }
- if (gridMakerInstance) {
- await gridMakerInstance.shutdown().catch(error => {
- logger.error({ error }, 'Failed to shutdown GridMaker');
- });
- gridMakerInstance = undefined;
- }
- marketMakerInstance = undefined;
- scalperInstance = undefined;
- fillHandlers.clear();
- const outstanding = globalCoordinator.list();
- await Promise.allSettled(outstanding.map(async snapshot => {
- try {
- const adapter = adapterRegistry.get(snapshot.accountId);
- const symbol = snapshot.symbol ?? defaultGridSymbol;
- if (snapshot.clientOrderId) {
- await adapter.cancelByClientId(snapshot.clientOrderId, symbol);
- } else {
- await adapter.cancel(snapshot.orderId, symbol);
- }
- globalCoordinator.release(snapshot.orderId);
- } catch (error) {
- logger.error({ orderId: snapshot.orderId, clientOrderId: snapshot.clientOrderId, error }, 'Failed to cancel outstanding order');
- }
- }));
- };
- const maybeTriggerKillSwitch = async (source?: string) => {
- if (killSwitchActive) return;
- if (!riskEngine.shouldHalt()) return;
- await triggerKillSwitch(source);
- };
- const refreshRisk = async (symbol: string) => {
- try {
- const snapshots = await adapterRegistry.collectPositions(symbol);
- const aggregated = await positionManager.snapshot(symbol, snapshots);
- const mid = shadow.mid(symbol) ?? aggregated.accounts[0]?.entryPx ?? 0;
- riskEngine.updateDeltaAbs(aggregated.base);
- const equity = aggregated.quote + aggregated.base * mid;
- riskEngine.updateEquity(equity);
- if (gridMakerInstance && baseClipEquityPct > 0) {
- const effectiveEquity = equity * Math.max(1, baseClipLeverage);
- const dynamicClip = Math.max(baseClipUsdFloor, effectiveEquity * baseClipEquityPct);
- gridMakerInstance.updateBaseClipUsd(dynamicClip);
- }
- await maybeTriggerKillSwitch('risk_refresh');
- } catch (error) {
- logger.error({ symbol, error }, 'Failed to refresh risk');
- }
- };
- const dispatchOrder = async (order: Order): Promise<{ id: string }> => {
- const accountId = order.accountId ?? makerAccountId;
- order.accountId = accountId;
- globalCoordinator.validate({
- accountId,
- symbol: order.symbol,
- side: order.side,
- price: order.px
- });
- const snapshots = await adapterRegistry.collectPositions(order.symbol);
- const aggregated = await positionManager.snapshot(order.symbol, snapshots);
- const mid = shadow.mid(order.symbol) ?? order.px;
- riskEngine.updateDeltaAbs(aggregated.base);
- const equity = aggregated.quote + aggregated.base * mid;
- riskEngine.updateEquity(equity);
- riskEngine.preCheck(order, aggregated, mid);
- if (killSwitchActive || riskEngine.shouldHalt()) {
- await maybeTriggerKillSwitch('pre_check');
- throw new Error('kill-switch active');
- }
- const adapter = adapterRegistry.get(accountId);
- const { id } = await adapter.place(order);
- globalCoordinator.register({
- orderId: id,
- clientOrderId: order.clientId,
- accountId,
- symbol: order.symbol,
- side: order.side,
- price: order.px,
- timestamp: Date.now()
- });
- return { id };
- };
- const router = new OrderRouter(
- async (order: Order) => dispatchOrder(order),
- (sym: string) => shadow.snapshot(sym),
- {
- maxBps: cfg.execution?.max_slippage_bps ?? 5,
- minIntervalMs: cfg.execution?.min_order_interval_ms ?? 0,
- bulkInitIntervalMs: cfg.execution?.bulk_init_interval_ms ?? 20
- }
- );
- const routeOrderUpdate = (accountId: string, update: any) => {
- const orderId = update?.order_id ?? update?.orderId;
- if (!orderId) return;
- const statusRaw = (update?.status ?? update?.state ?? '').toString().toLowerCase();
- if (statusRaw === 'filled' || statusRaw === 'canceled' || statusRaw === 'cancelled' || statusRaw === 'expired' || statusRaw === 'rejected') {
- globalCoordinator.release(orderId);
- }
- // 将订单更新传递给 GridMaker(如果是 maker 账户且 GridMaker 已初始化)
- if (accountId === makerAccountId && gridMakerInstance) {
- gridMakerInstance.handleOrderUpdate(update);
- }
- };
- const routeFill = async (accountId: string, fill: Fill) => {
- try {
- const handlers = fillHandlers.get(accountId);
- if (handlers) {
- for (const handler of handlers) {
- await handler(fill);
- }
- }
- globalCoordinator.release(fill.orderId);
- await refreshRisk(fill.symbol);
- if (accountId === hedgerAccountId) {
- riskEngine.recordHedgeSuccess();
- }
- await maybeTriggerKillSwitch('fill');
- } catch (error) {
- logger.error({ accountId, fill, error }, 'Failed to process fill');
- }
- };
- await setupWsOrderGateways({
- wsUrl: cfg.ws_url,
- routeFill,
- routeOrder: routeOrderUpdate,
- routeAccount: async (accountId, update) => {
- try {
- const symbols = extractSymbolsFromAccount(update);
- if (symbols.size === 0) {
- await refreshRisk(primarySymbol);
- } else {
- for (const symbol of symbols) {
- await refreshRisk(symbol);
- }
- }
- await maybeTriggerKillSwitch('account');
- } catch (error) {
- logger.error({ accountId, update, error }, 'Failed to process account update');
- }
- }
- });
- router.attachCancelHandlers(
- async orderId => {
- try {
- const snapshot = globalCoordinator.peek(orderId);
- const accountId = snapshot?.accountId ?? makerAccountId;
- const adapter = adapterRegistry.get(accountId);
- const symbol = snapshot?.symbol ?? defaultGridSymbol;
- if (snapshot?.clientOrderId) {
- await adapter.cancelByClientId(snapshot.clientOrderId, symbol);
- globalCoordinator.release(orderId);
- } else {
- await adapter.cancel(orderId, symbol);
- globalCoordinator.release(orderId);
- }
- } catch (error) {
- logger.error({ orderId, error: normalizeError(error) }, 'Failed to cancel order');
- }
- },
- async clientId => {
- try {
- const snapshot = globalCoordinator.peekByClientId(clientId);
- const accountId = snapshot?.accountId ?? makerAccountId;
- const adapter = adapterRegistry.get(accountId);
- const symbol = snapshot?.symbol ?? defaultGridSymbol;
- await adapter.cancelByClientId(clientId, symbol);
- globalCoordinator.releaseByClientId(clientId);
- } catch (error) {
- logger.error({ clientId, error: normalizeError(error) }, 'Failed to cancel by clientId');
- }
- }
- );
- const hedgeEngine = new HedgeEngine(
- cfg.hedge || { kp: 0.6, ki: 0.05, Qmax: 0.4, minIntervalMs: 200 },
- async (order: Order) => {
- if (killSwitchActive) {
- throw new Error('kill-switch active');
- }
- try {
- const result = await dispatchOrder({ ...order, accountId: order.accountId ?? hedgerAccountId });
- riskEngine.recordHedgeSuccess();
- return result;
- } catch (error) {
- riskEngine.recordHedgeFailure();
- await maybeTriggerKillSwitch('hedge_failure');
- throw error;
- }
- },
- () => shadow.mid(primarySymbol)
- );
- // 根据策略模式启动不同策略
- if (strategyMode === 'grid' || strategyMode === 'both') {
- if (baseClipEquityPct > 0) {
- await refreshRisk(primarySymbol);
- }
- const computeBaseClipUsd = () => {
- if (baseClipEquityPct <= 0) return baseClipUsdFloor;
- const status = riskEngine.getStatus();
- const equity = status.currentEquity > 0 ? status.currentEquity : status.peakEquity;
- const effectiveEquity = equity * Math.max(1, baseClipLeverage);
- const dynamic = effectiveEquity * baseClipEquityPct;
- return Math.max(baseClipUsdFloor, dynamic);
- };
- logger.info({ gridConfig: cfg.grid }, 'Starting Grid strategy');
- const gridConfig = {
- symbol: cfg.grid.symbol || primarySymbol,
- gridStepBps: cfg.grid.grid_step_bps,
- gridRangeBps: cfg.grid.grid_range_bps,
- baseClipUsd: computeBaseClipUsd(),
- maxLayers: cfg.grid.max_layers,
- hedgeThresholdBase: cfg.grid.hedge_threshold_base,
- accountId: cfg.grid.account_id || makerAccountId,
- tickSize: cfg.grid.tick_size,
- lotSize: cfg.grid.lot_size,
- incrementalMode: cfg.grid.incremental_mode ?? false
- };
- const adaptiveConfig: AdaptiveGridConfig | undefined = cfg.grid.adaptive?.enabled
- ? {
- enabled: true,
- volatilityWindowMinutes: cfg.grid.adaptive.volatility_window_minutes ?? 30,
- minVolatilityBps: cfg.grid.adaptive.min_volatility_bps ?? 20,
- maxVolatilityBps: cfg.grid.adaptive.max_volatility_bps ?? 200,
- minGridStepBps: cfg.grid.adaptive.min_grid_step_bps ?? 10,
- maxGridStepBps: cfg.grid.adaptive.max_grid_step_bps ?? 100,
- recenterEnabled: cfg.grid.adaptive.recenter_enabled ?? true,
- recenterThresholdBps: cfg.grid.adaptive.recenter_threshold_bps ?? 150,
- recenterCooldownMs: cfg.grid.adaptive.recenter_cooldown_ms ?? 300_000,
- minStepChangeRatio: cfg.grid.adaptive.min_step_change_ratio ?? 0.2,
- minSamples: cfg.grid.adaptive.min_samples,
- maxCadenceMs: cfg.grid.adaptive.max_cadence_ms,
- hedgePendingTimeoutMs: cfg.grid.adaptive.hedge_pending_timeout_ms,
- postOnlyCushionBps: cfg.grid.adaptive.post_only_cushion_bps ?? 5,
- minLayers: cfg.grid.adaptive.min_layers
- }
- : undefined;
- const fillRateControlConfig = cfg.grid.fill_rate_control?.enabled
- ? {
- targetFillsPerMinute: cfg.grid.fill_rate_control.target_fills_per_minute ?? 30,
- targetMakerRatio: cfg.grid.fill_rate_control.target_maker_ratio ?? 0.85,
- maxSelfTradeRatio: cfg.grid.fill_rate_control.max_self_trade_ratio ?? 0.01,
- kp_step: cfg.grid.fill_rate_control.kp_step ?? 0.02,
- ki_step: cfg.grid.fill_rate_control.ki_step ?? 0.002,
- kp_clip: cfg.grid.fill_rate_control.kp_clip ?? 0.1,
- ki_clip: cfg.grid.fill_rate_control.ki_clip ?? 0.01,
- minGridStepBps: cfg.grid.fill_rate_control.min_grid_step_bps ?? 0.5,
- maxGridStepBps: cfg.grid.fill_rate_control.max_grid_step_bps ?? 3.0,
- minClipUsd: cfg.grid.fill_rate_control.min_clip_usd ?? 15,
- maxClipUsd: cfg.grid.fill_rate_control.max_clip_usd ?? 60,
- minMakerRatioForAdjust: cfg.grid.fill_rate_control.min_maker_ratio_for_adjust ?? 0.70,
- emergencyStepMultiplier: cfg.grid.fill_rate_control.emergency_step_multiplier ?? 1.5
- }
- : undefined;
- const cancelAllOrders = async (symbol: string) => {
- try {
- const adapter = adapterRegistry.get(gridConfig.accountId);
- await adapter.cancelAll(symbol);
- } catch (error) {
- logger.error({ symbol, error: normalizeError(error) }, 'Grid cancel_all failed');
- throw error;
- }
- };
- const releaseOrder = (orderId: string, clientOrderId?: string) => {
- if (clientOrderId) {
- globalCoordinator.releaseByClientId(clientOrderId);
- }
- if (orderId) {
- globalCoordinator.release(orderId);
- }
- };
- const gridMaker = new GridMaker(
- gridConfig,
- router,
- hedgeEngine,
- shadow,
- logger,
- adaptiveConfig,
- cancelAllOrders,
- releaseOrder,
- fillRateControlConfig
- );
- gridMakerInstance = gridMaker;
- registerFillHandler(gridConfig.accountId, async fill => {
- if (fill.symbol !== gridConfig.symbol) return;
- await gridMaker.onFill(fill);
- });
- registerFillHandler(hedgerAccountId, async fill => {
- if (fill.symbol !== gridConfig.symbol) return;
- await gridMaker.onHedgeFill(fill);
- });
- // 清理旧订单并初始化网格
- logger.info({ symbol: gridConfig.symbol, accountId: gridConfig.accountId }, 'Cleaning up old orders before grid initialization');
- try {
- await cancelAllOrders(gridConfig.symbol);
- logger.info('Old orders cancelled successfully');
- } catch (error) {
- logger.warn({ error: normalizeError(error) }, 'Failed to cancel old orders, proceeding with initialization');
- }
- await gridMaker.initialize();
- // 模拟 Fill 事件监听(实际需要从 adapter 或 WebSocket 获取)
- // adapter.onFill((fill: Fill) => gridMaker.onFill(fill));
- // 定期输出状态
- gridStatusTimer = setInterval(() => {
- if (killSwitchActive) return;
- const status = gridMaker.getStatus();
- logger.info({ status }, 'Grid status');
- }, 30000); // 每 30 秒
- gridTickTimer = setInterval(async () => {
- if (killSwitchActive) return;
- try {
- await gridMaker.onTick();
- } catch (error) {
- console.error('GridMaker onTick failed:', error);
- logger.error({
- error: error instanceof Error ? {
- message: error.message,
- stack: error.stack,
- name: error.name
- } : error
- }, 'GridMaker onTick failed');
- }
- }, cfg.grid.adaptive?.tick_interval_ms ?? 60_000);
- }
- if (strategyMode === 'scalper' || strategyMode === 'both') {
- logger.info('Starting MarketMaker + Scalper strategies');
- const mm = new MarketMaker(
- {
- symbol: primarySymbol,
- tickSz: marketMakerCfg.tick_sz ?? 0.5,
- clipSz: marketMakerCfg.clip_sz ?? 0.001,
- spreadBps: marketMakerCfg.spread_bps ?? 1.6
- },
- router,
- ()=>shadow.snapshot(primarySymbol),
- logger
- );
- marketMakerInstance = mm;
- registerFillHandler(makerAccountId, async fill => {
- if (fill.symbol !== primarySymbol) return;
- await mm.onFill(fill);
- });
- const scalp = new MicroScalper(
- {
- symbol: primarySymbol,
- clipSz: scalperCfg.clip_sz ?? 0.001,
- triggerSpreadBps: scalperCfg.trigger?.spread_bps ?? 1.8,
- tpBps: scalperCfg.tp_bps ?? 3,
- slBps: scalperCfg.sl_bps ?? 6,
- cooldownMs: scalperCfg.trigger?.min_cooldown_ms ?? 250
- },
- router,
- ()=>shadow.snapshot(primarySymbol),
- logger
- );
- scalperInstance = scalp;
- registerFillHandler(makerAccountId, async fill => {
- if (fill.symbol !== primarySymbol) return;
- await scalp.onFill(fill);
- });
- const mmInterval = marketMakerCfg.reprice_ms ?? 300;
- const scalperInterval = scalperCfg.on_book_interval_ms ?? 150;
- marketMakerTimer = setInterval(async () => {
- if (killSwitchActive) return;
- try {
- await mm.onTick();
- } catch (error) {
- logger.error({ error }, 'MarketMaker onTick failed');
- }
- }, mmInterval);
- scalperTimer = setInterval(async () => {
- if (killSwitchActive) return;
- try {
- await scalp.onBook();
- } catch (error) {
- logger.error({ error }, 'MicroScalper onBook failed');
- }
- }, scalperInterval);
- }
- }
- main().catch(e => {
- logger.error({ error: e }, 'Fatal error in main');
- setImmediate(() => process.exit(1));
- });
- function createRunnerLogger() {
- const logLevel = (process.env.LOG_LEVEL ?? "info") as pino.Level;
- const streams: pino.StreamEntry[] = [
- { level: logLevel, stream: process.stdout }
- ];
- let filePath: string | undefined;
- try {
- const fileLevel = (process.env.LOG_FILE_LEVEL ?? logLevel) as pino.Level;
- const customPath = process.env.LOG_FILE;
- const logDir = process.env.LOG_DIR ?? "logs";
- const timestamp = new Date().toISOString().replace(/[:.]/g, "-");
- const resolvedPath = resolve(customPath ?? join(logDir, `runner-${timestamp}.log`));
- mkdirSync(dirname(resolvedPath), { recursive: true });
- const destination = pino.destination({ dest: resolvedPath, mkdir: true, sync: false });
- globalFileDestination = destination;
- streams.push({ level: fileLevel, stream: destination });
- filePath = resolvedPath;
- } catch (error) {
- console.error("Failed to enable file logging", error);
- }
- const instance =
- streams.length > 1
- ? pino({ level: logLevel }, pino.multistream(streams))
- : pino({ level: logLevel });
- if (filePath) {
- instance.info({ logFile: filePath }, "File logging enabled");
- }
- // 优雅退出处理:确保日志刷新到磁盘
- const gracefulShutdown = (signal: string) => {
- instance.info({ signal }, 'Received shutdown signal, flushing logs...');
- flushLogsSafe();
- setTimeout(() => {
- instance.info('Logs flushed, exiting');
- process.exit(0);
- }, 100);
- };
- process.on('SIGINT', () => gracefulShutdown('SIGINT'));
- process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
- process.on('uncaughtException', (error) => {
- console.error('Uncaught exception:', error);
- try {
- instance.error({ error: error instanceof Error ? { message: error.message, stack: error.stack } : error }, 'Uncaught exception');
- } catch {}
- flushLogsSafe();
- process.exit(1);
- });
- process.on('unhandledRejection', (reason) => {
- instance.error({ reason }, 'Unhandled rejection');
- flushLogsSafe();
- process.exit(1);
- });
- return instance;
- }
- async function setupWsOrderGateways(options: {
- wsUrl?: string;
- routeFill: (accountId: string, fill: Fill) => Promise<void>;
- routeOrder: (accountId: string, update: any) => void;
- routeAccount: (accountId: string, update: any) => Promise<void>;
- }): Promise<void> {
- const { wsUrl, routeFill, routeOrder, routeAccount } = options;
- if (!wsUrl) {
- logger.warn('No ws_url configured; skipping WebSocket order gateway setup');
- return;
- }
- const tasks = wsGatewayConfigs
- .filter(entry => entry.address && entry.privateKey)
- .map(async entry => {
- try {
- const address = entry.address!;
- const privateKey = entry.privateKey!;
- const wsClient = new PacificaWebSocket({
- url: wsUrl,
- apiKey: address,
- secret: privateKey
- });
- const limiter = wsRateLimiterConfig
- ? new RateLimiter({
- burst: wsRateLimiterConfig.burst ?? 6,
- refillPerSec: wsRateLimiterConfig.refill_per_sec ?? 5,
- maxQueueDepth: wsRateLimiterConfig.max_queue_depth ?? 100
- })
- : undefined;
- const gateway = new PacificaWsOrderGateway(wsClient, {
- apiKey: address,
- secret: privateKey
- }, undefined, undefined, logger, limiter);
- wsClient.on('message', message => {
- handlePrivateMessage(address, message, routeFill, routeOrder, routeAccount);
- });
- await gateway.connect();
- adapterRegistry.get(entry.id).attachWsGateway(gateway);
- const tradesParams = { source: 'account_trades', account: address };
- const ordersParams = { source: 'account_order_updates', account: address };
- wsClient.subscribeAuthenticated('account_trades', tradesParams);
- wsClient.subscribeAuthenticated('account_order_updates', ordersParams);
- logger.info({ accountId: entry.id, tradesParams, ordersParams }, 'WS order gateway connected and subscribed to channels');
- } catch (error) {
- logger.error({ accountId: entry.id, error }, 'Failed to initialize WS order gateway');
- throw error;
- }
- });
- await Promise.all(tasks);
- }
- function handlePrivateMessage(
- address: string,
- raw: unknown,
- routeFill: (accountId: string, fill: Fill) => Promise<void>,
- routeOrder: (accountId: string, update: any) => void,
- routeAccount: (accountId: string, update: any) => Promise<void>
- ): void {
- const accountId = addressToAccountId.get(address);
- if (!accountId) return;
- let text: string;
- if (typeof raw === "string") {
- text = raw;
- } else if (raw instanceof Buffer) {
- text = raw.toString("utf8");
- } else if (typeof (raw as any)?.toString === "function") {
- text = (raw as any).toString();
- } else {
- return;
- }
- let parsed: any;
- try {
- parsed = JSON.parse(text);
- } catch {
- return;
- }
- const channel = parsed?.channel;
- if (typeof channel !== "string") return;
- if (channel === "account_trades" || (channel.startsWith("fills.") && channel.endsWith(address))) {
- const payload = parsed.data;
- const records = Array.isArray(payload) ? payload : [payload];
- logger.info({ accountId, channel, recordCount: records.length }, 'Received fill message from WebSocket');
- for (const record of records) {
- if (record?.u && record.u !== address) continue;
- const fill = mapFillPayload(record);
- if (!fill) {
- logger.warn({ accountId, record }, 'Failed to map fill payload');
- continue;
- }
- logger.info({ accountId, fill }, 'Processing fill event');
- routeFill(accountId, fill).catch(error => {
- logger.error({ accountId, error }, 'Failed to route fill');
- });
- }
- return;
- }
- if (channel === "account_order_updates" || (channel.startsWith("orders.") && channel.endsWith(address))) {
- const payload = parsed.data;
- const records = Array.isArray(payload) ? payload : [payload];
- logger.info({ accountId, channel, recordCount: records.length }, 'Received order update message from WebSocket');
- for (const record of records) {
- if (record?.u && record.u !== address) continue;
- const update = record ?? {};
- logger.info({ accountId, orderUpdate: update }, 'Processing order update event');
- routeOrder(accountId, update);
- }
- return;
- }
- if (channel.startsWith("account.") && channel.endsWith(address)) {
- const payload = parsed.data ?? parsed;
- routeAccount(accountId, payload).catch(error => {
- logger.error({ accountId, error }, 'Failed to route account update');
- });
- return;
- }
- logger.debug({ accountId, channel, payload: parsed?.data ?? parsed }, 'Unhandled Pacifica WS message');
- }
- function mapFillPayload(data: any): Fill | undefined {
- if (!data) return undefined;
- const orderId = data.order_id ?? data.orderId ?? data.i ?? data.orderId;
- const symbol = data.symbol ?? data.s;
- const rawSideInput = (data.side ?? data.direction ?? data.d ?? data.ts ?? '').toString().toLowerCase();
- let side: 'buy' | 'sell' | undefined;
- if (rawSideInput === 'bid' || rawSideInput === 'buy' || rawSideInput.includes('long')) {
- side = 'buy';
- } else if (rawSideInput === 'ask' || rawSideInput === 'sell' || rawSideInput.includes('short')) {
- side = 'sell';
- }
- const price = Number(data.price ?? data.px ?? data.p);
- const sizeRaw = Number(data.size ?? data.sz ?? data.amount ?? data.a);
- if (!orderId || !symbol || !side || !Number.isFinite(price) || !Number.isFinite(sizeRaw)) {
- return undefined;
- }
- const size = Math.abs(sizeRaw);
- const fee = data.fee !== undefined ? Number(data.fee) : data.f !== undefined ? Number(data.f) : 0;
- const liquidityRaw = (data.liquidity ?? data.te ?? '').toString().toLowerCase();
- const liquidity = liquidityRaw.includes('taker') ? 'taker' : 'maker';
- const tradeId = String(data.trade_id ?? data.tradeId ?? data.h ?? `${orderId}-${Date.now()}`);
- const tsValue = data.ts !== undefined && Number.isFinite(Number(data.ts)) ? Number(data.ts)
- : data.t !== undefined ? Number(data.t)
- : Date.now();
- const ts = Number.isFinite(tsValue) ? tsValue : Date.now();
- return {
- orderId: String(orderId),
- tradeId,
- symbol: String(symbol),
- side,
- px: price,
- sz: size,
- fee,
- liquidity: liquidity as "maker" | "taker",
- ts
- };
- }
- function mapKillSwitch(raw: any): KillSwitchConfig | undefined {
- if (!raw) return undefined;
- const drawdownPct = Number(raw.drawdown_pct ?? raw.drawdownPct ?? 0);
- const triggers = Array.isArray(raw.triggers)
- ? raw.triggers
- .map((t: any) => ({
- type: t.type,
- threshold: Number(t.threshold)
- }))
- .filter((trigger: { type?: string; threshold: number }): trigger is { type: string; threshold: number } => {
- return typeof trigger.type === "string" && !Number.isNaN(trigger.threshold);
- })
- : undefined;
- return {
- drawdownPct: Math.abs(drawdownPct),
- triggers
- };
- }
- function extractSymbolsFromAccount(update: any): Set<string> {
- const symbols = new Set<string>();
- if (!update) return symbols;
- const collect = (value: any) => {
- if (!value) return;
- const items = Array.isArray(value) ? value : [value];
- for (const item of items) {
- const symbol = item?.symbol ?? item?.asset ?? item?.pair;
- if (typeof symbol === "string" && symbol.trim().length > 0) {
- symbols.add(symbol.trim());
- }
- }
- };
- if (Array.isArray(update)) {
- for (const item of update) {
- const nested = extractSymbolsFromAccount(item);
- nested.forEach(s => symbols.add(s));
- }
- return symbols;
- }
- collect(update.positions ?? update.position);
- collect(update.balances ?? update.balance);
- collect(update.holdings);
- if (update.symbol) {
- collect(update);
- }
- return symbols;
- }
- function normalizeError(error: unknown): { message?: string; name?: string; status?: number; code?: string } | undefined {
- if (!error) return undefined;
- if (error instanceof Error) {
- const normalized: { message: string; name: string; status?: number; code?: string } = {
- message: error.message,
- name: error.name
- };
- const anyErr = error as any;
- if (typeof anyErr.status === "number") {
- normalized.status = anyErr.status;
- }
- if (anyErr.code !== undefined) {
- normalized.code = String(anyErr.code);
- }
- return normalized;
- }
- if (typeof error === "object") {
- return error as any;
- }
- return { message: String(error) };
- }
|