Browse Source

Use the `postBlockHooks` to run the scheduled jobs

Theophile Sandoz 3 years ago
parent
commit
b4a820b41b
2 changed files with 23 additions and 25 deletions
  1. 1 1
      query-node/manifest.yml
  2. 22 24
      query-node/mappings/src/scheduler.ts

+ 1 - 1
query-node/manifest.yml

@@ -935,5 +935,5 @@ mappings:
     - handler: bootstrapData
       filter:
         height: "[0,0]" # will be executed only at genesis
-    - handler: launchScheduler
   postBlockHooks:
+    - handler: runScheduler

+ 22 - 24
query-node/mappings/src/scheduler.ts

@@ -1,6 +1,5 @@
 import { In } from 'typeorm'
-import { DatabaseManager, StoreContext } from '@joystream/hydra-common'
-import { eventEmitter, ProcessorEvents } from '@joystream/hydra-processor/lib/start/processor-events'
+import { BlockContext, DatabaseManager, StoreContext } from '@joystream/hydra-common'
 import { Bounty, BountyStage } from 'query-node/dist/model'
 import {
   bountyScheduleWorkSubmissionEnd,
@@ -9,41 +8,40 @@ import {
   scheduledFundingEnd,
 } from './bounty'
 
+const scheduleRecord: { [n: number]: (() => Promise<void>)[] } = {}
 let isSchedulerRunning = false
-let toBeScheduled: [number, () => void][] = []
+let toBeScheduled: [number, () => Promise<void>][] = []
 
-export async function launchScheduler({ store }: StoreContext): Promise<void> {
+export async function runScheduler({ block, store }: BlockContext & StoreContext): Promise<void> {
   if (!isSchedulerRunning) {
-    runScheduler()
+    isSchedulerRunning = true
     await scheduleMissedMappings(store)
   }
+
+  await runScheduledJobs(block.height)
 }
 
-export function scheduleAtBlock(blockNumber: number, job: () => void): void {
+export function scheduleAtBlock(blockNumber: number, job: () => Promise<void>): void {
   toBeScheduled.push([blockNumber, job])
 }
 
-function runScheduler(): void {
-  isSchedulerRunning = true
-  const scheduleRecord: { [n: number]: (() => void)[] } = {}
-
-  eventEmitter.on(ProcessorEvents.INDEXER_STATUS_CHANGE, (indexerStatus) => {
-    if (toBeScheduled.length) {
-      toBeScheduled.forEach(([blockNumber, job]) => {
-        if (blockNumber < indexerStatus.chainHeight) {
-          job()
-        } else {
-          scheduleRecord[blockNumber] = [...(scheduleRecord[blockNumber] ?? []), job]
-        }
-      })
-      toBeScheduled = []
+async function runScheduledJobs(currentBlock: number): Promise<void> {
+  // Queue new jobs
+  if (toBeScheduled.length) {
+    for (const [scheduledFor, job] of toBeScheduled) {
+      const blockNumber = Math.max(scheduledFor, currentBlock)
+      scheduleRecord[blockNumber] = [...(scheduleRecord[blockNumber] ?? []), job]
     }
+    toBeScheduled = []
+  }
 
-    if (scheduleRecord[indexerStatus.chainHeight]) {
-      scheduleRecord[indexerStatus.chainHeight].forEach((job) => job())
-      delete scheduleRecord[indexerStatus.chainHeight]
+  // Execute jobs scheduled for the current block
+  if (scheduleRecord[currentBlock]) {
+    for (const job of scheduleRecord[currentBlock]) {
+      await job()
     }
-  })
+    delete scheduleRecord[currentBlock]
+  }
 }
 
 async function scheduleMissedMappings(store: DatabaseManager): Promise<void> {