Browse Source

integration-tests: resource locking

Mokhtar Naamani 4 years ago
parent
commit
d3b58680e6

+ 2 - 1
tests/network-tests/src/Flow.ts

@@ -1,5 +1,6 @@
 import { Api } from './Api'
 import { QueryNodeApi } from './QueryNodeApi'
+import { ResourceLocker } from './Resources'
 
-export type FlowProps = { api: Api; env: NodeJS.ProcessEnv; query: QueryNodeApi }
+export type FlowProps = { api: Api; env: NodeJS.ProcessEnv; query: QueryNodeApi; lock: ResourceLocker }
 export type Flow = (args: FlowProps) => Promise<void>

+ 16 - 0
tests/network-tests/src/InvertedPromise.ts

@@ -0,0 +1,16 @@
+function noop(): void {
+  // No-Op
+}
+
+export class InvertedPromise<T> {
+  public resolve: (value: T) => void = noop
+  public reject: (reason?: any) => void = noop
+  public readonly promise: Promise<T>
+
+  constructor() {
+    this.promise = new Promise((resolve, reject) => {
+      this.resolve = resolve
+      this.reject = reject
+    })
+  }
+}

+ 18 - 25
tests/network-tests/src/Job.ts

@@ -3,26 +3,11 @@ import { EventEmitter } from 'events'
 import { ApiFactory } from './Api'
 import { QueryNodeApi } from './QueryNodeApi'
 import { Flow } from './Flow'
+import { InvertedPromise } from './InvertedPromise'
+import { ResourceManager } from './Resources'
 
 export type JobProps = { apiFactory: ApiFactory; env: NodeJS.ProcessEnv; query: QueryNodeApi }
 
-function noop() {
-  // No-Op
-}
-
-class InvertedPromise<T> {
-  public resolve: (value: T) => void = noop
-  public reject: (reason?: any) => void = noop
-  public readonly promise: Promise<T>
-
-  constructor() {
-    this.promise = new Promise((resolve, reject) => {
-      this.resolve = resolve
-      this.reject = reject
-    })
-  }
-}
-
 export enum JobOutcome {
   Succeeded = 'Succeeded',
   Failed = 'Failed',
@@ -92,7 +77,7 @@ export class Job {
     return this._label
   }
 
-  private async run(jobProps: JobProps): Promise<void> {
+  private async run(jobProps: JobProps, resources: ResourceManager): Promise<void> {
     // prevent any additional changes to configuration
     this._locked = true
 
@@ -108,13 +93,21 @@ export class Job {
 
     this.debug('Running')
     const flowRunResults = await Promise.allSettled(
-      this._flows.map((flow) =>
-        flow({
-          api: jobProps.apiFactory.getApi(`${this.label}:${flow.name}`),
-          env: jobProps.env,
-          query: jobProps.query,
-        })
-      )
+      this._flows.map(async (flow) => {
+        const locker = resources.createLocker()
+        try {
+          await flow({
+            api: jobProps.apiFactory.getApi(`${this.label}:${flow.name}`),
+            env: jobProps.env,
+            query: jobProps.query,
+            lock: locker.lock,
+          })
+        } catch (err) {
+          locker.release()
+          throw err
+        }
+        locker.release()
+      })
     )
 
     flowRunResults.forEach((result, ix) => {

+ 5 - 4
tests/network-tests/src/JobManager.ts

@@ -1,8 +1,9 @@
 import { EventEmitter } from 'events'
 import { Flow } from './Flow'
-import { Job, JobOutcome } from './Job'
+import { Job, JobOutcome, JobProps } from './Job'
 import { ApiFactory } from './Api'
 import { QueryNodeApi } from './QueryNodeApi'
+import { ResourceManager } from './Resources'
 
 export class JobManager extends EventEmitter {
   private _jobs: Job[] = []
@@ -26,7 +27,7 @@ export class JobManager extends EventEmitter {
     return job
   }
 
-  private getJobProps() {
+  private getJobProps(): JobProps {
     return {
       env: this._env,
       query: this._query,
@@ -34,8 +35,8 @@ export class JobManager extends EventEmitter {
     }
   }
 
-  public async run(): Promise<void> {
-    this.emit('run', this.getJobProps())
+  public async run(resources: ResourceManager): Promise<void> {
+    this.emit('run', this.getJobProps(), resources)
 
     const outcomes = await Promise.all(this._jobs.map((job) => job.outcome))
 

+ 90 - 0
tests/network-tests/src/Resources.ts

@@ -0,0 +1,90 @@
+import { assert } from 'chai'
+import { Utils } from './utils'
+import Debugger from 'debug'
+
+const debug = Debugger('resources')
+
+export type Resources = Record<ResourceName, Resource>
+export type ResourceLocker = (resource: ResourceName, timeout?: number) => Promise<() => void>
+
+export class Resource {
+  private name: string
+
+  // the number of concurrent locks that can be acquired concurrently before the resource
+  // becomes unavailable until a lock is released.
+  private readonly concurrency: number
+  private lockCount = 0
+
+  constructor(key: string, concurrency?: number) {
+    this.name = key
+    this.concurrency = concurrency || 1
+  }
+
+  public async lock(timeoutMinutes = 1): Promise<() => void> {
+    const timeoutAt = Date.now() + timeoutMinutes * 60 * 1000
+
+    while (this.lockCount === this.concurrency) {
+      debug(`waiting for ${this.name}`)
+      await Utils.wait(30000)
+      if (Date.now() > timeoutAt) throw new Error(`Timeout getting resource lock: ${this.name}`)
+    }
+
+    debug(`acquired ${this.name}`)
+    this.lockCount++
+
+    // Return a function used to release the lock
+    return (() => {
+      let called = false
+      return () => {
+        if (called) return
+        called = true
+        debug(`released ${this.name}`)
+        this.lockCount--
+      }
+    })()
+  }
+}
+
+export enum ResourceName {
+  Council = 'Council',
+  Proposals = 'Proposals',
+}
+
+export class ResourceManager {
+  // Internal Map
+  private resources = new Map<string, Resource>()
+
+  private readonly locks: Resources
+
+  constructor() {
+    this.locks = this.createNamedResources()
+  }
+
+  private add(key: string, concurrency?: number): Resource {
+    assert(!this.resources.has(key))
+    this.resources.set(key, new Resource(key, concurrency))
+    return this.resources.get(key) as Resource
+  }
+
+  private createNamedResources(): Resources {
+    return {
+      [ResourceName.Council]: this.add(ResourceName.Council),
+      [ResourceName.Proposals]: this.add(ResourceName.Proposals, 5),
+    }
+  }
+
+  public createLocker(): { release: () => void; lock: ResourceLocker } {
+    const unlockers: Array<() => void> = []
+    const release = () => {
+      unlockers.forEach((unlock) => unlock())
+    }
+    return {
+      release,
+      lock: async (resource: ResourceName, timeout?: number) => {
+        const unlock = await this.locks[resource].lock(timeout)
+        unlockers.push(unlock)
+        return unlock
+      },
+    }
+  }
+}

+ 4 - 1
tests/network-tests/src/Scenario.ts

@@ -7,6 +7,7 @@ import Debugger from 'debug'
 import { Flow } from './Flow'
 import { Job } from './Job'
 import { JobManager } from './JobManager'
+import { ResourceManager } from './Resources'
 
 export type ScenarioProps = {
   env: NodeJS.ProcessEnv
@@ -45,8 +46,10 @@ export async function scenario(scene: (props: ScenarioProps) => Promise<void>):
 
   await scene({ env, debug, job: jobs.createJob.bind(jobs) })
 
+  const resources = new ResourceManager()
+
   try {
-    await jobs.run()
+    await jobs.run(resources)
   } catch (err) {
     console.error(err)
     process.exit(-1)

+ 4 - 1
tests/network-tests/src/flows/proposals/councilSetup.ts

@@ -5,13 +5,16 @@ import { ElectCouncilFixture } from '../../fixtures/councilElectionModule'
 import { BuyMembershipHappyCaseFixture } from '../../fixtures/membershipModule'
 import Debugger from 'debug'
 import { FixtureRunner } from '../../Fixture'
+import { ResourceName } from '../../Resources'
 
-export default async function councilSetup({ api, env }: FlowProps): Promise<void> {
+export default async function councilSetup({ api, env, lock }: FlowProps): Promise<void> {
   const label = 'councilSetup'
   const debug = Debugger(`flow:${label}`)
 
   debug('Started')
 
+  await lock(ResourceName.Council)
+
   // Skip creating council if already elected
   if ((await api.getCouncil()).length) {
     return debug('Skipping council setup. A Council is already elected')