Kaynağa Gözat

parallel run

Shawn Lu 1 yıl önce
ebeveyn
işleme
9436d4c684
2 değiştirilmiş dosya ile 32 ekleme ve 4 silme
  1. 2 4
      src/jobs/BridgeJob.ts
  2. 30 0
      src/utils/index.ts

+ 2 - 4
src/jobs/BridgeJob.ts

@@ -4,6 +4,7 @@ import { Task } from '@prisma/client'
 import { ChainId } from '../config/chain'
 import { StargateClient } from '../bridge/StargateClient'
 import { OwltoClient } from '../bridge/OwltoClient'
+import { forEachAsync } from '../utils'
 
 export class BridgeJob extends CronJob {
   constructor() {
@@ -24,10 +25,7 @@ export class BridgeJob extends CronJob {
         },
       },
     })
-    // process each task
-    for (const task of tasks) {
-      await this.processTask(task)
-    }
+    await forEachAsync(tasks, 10, async (task: Task) => await this.processTask(task))
   }
 
   async processTask(task: Task) {

+ 30 - 0
src/utils/index.ts

@@ -0,0 +1,30 @@
+export async function forEachAsync(array, concurrency, asyncFn) {
+  let running = 0 // 当前正在运行的任务数
+  let completed = 0 // 完成的任务数
+  let currentTaskIndex = 0 // 当前任务索引
+  return new Promise<void>(resolve => {
+    // 执行任务的函数
+    const runTask = async () => {
+      if (completed >= array.length) {
+        // 所有任务都完成了
+        resolve()
+        return
+      }
+      // 如果有可运行的任务且没有超过并发限制
+      while (currentTaskIndex < array.length && running < concurrency) {
+        const index = currentTaskIndex
+        const item = array[currentTaskIndex]
+        currentTaskIndex++ // 准备下一个任务的索引
+        running++ // 增加正在运行的任务数
+        void (async () => {
+          await asyncFn(item, index) // 等待异步任务完成
+          running-- // 任务完成,减少正在运行的任务数
+          completed++ // 增加完成的任务数
+          void runTask() // 尝试运行下一个任务
+        })()
+      }
+    }
+    // 初始化并发任务
+    runTask()
+  })
+}