@@ -4,67 +4,148 @@ import { QueryNodeApi } from './QueryNodeApi'
import { config } from 'dotenv'
import { ApolloClient, InMemoryCache } from '@apollo/client'
import Debugger from 'debug'
+import { EventEmitter } from 'events'
+function noop() {
+ // No-Op
+class InvertedPromise<T> {
+ public resolve: (value: T) => void = noop
+ public reject: (reason?: any) => void = noop
+ public readonly promise: Promise<T>
+ constructor() {
+ this.promise = new Promise((resolve, reject) => {
+ this.resolve = resolve
+ this.reject = reject
+ })
+ }
export type FlowArgs = { api: Api; env: NodeJS.ProcessEnv; query: QueryNodeApi }
export type Flow = (args: FlowArgs) => Promise<void>
+enum JobOutcome {
+ Succeeded,
+ Failed,
+ Skipped,
class Job {
- private dependencies: Job[]
- private flowArgs: FlowArgs
+ private required: Job[] = []
+ private dependencies: Job[] = []
private flows: Flow[]
private manager: FlowManager
+ private _outcome: InvertedPromise<JobOutcome>
+ private locked = false
+ private name: string
+ private debug: Debugger.Debugger
- constructor(manager: FlowManager, flowArgs: FlowArgs, flows: Flow[]) {
+ constructor(manager: FlowManager, flows: Flow[], name: string) {
+ this.name = name
this.manager = manager
- this.flowArgs = flowArgs
this.flows = flows
+ this._outcome = new InvertedPromise<JobOutcome>()
+ this.manager.on('run', this.run.bind(this))
+ this.debug = Debugger(`job:${this.name}`)
- // Depend on another job to complete
- public afterSuccessOf(job: Job): Job {
- this.dependencies.push(job)
+ // Depend on another job to complete successfully
+ public requires(job: Job): Job {
+ if (this.locked) throw new Error('Job is locked')
+ if (job === this) throw new Error('Job Cannot depend on itself')
+ if (job.hasDependencyOn(this)) {
+ throw new Error('Job Circualr dependency')
+ }
+ this.required.push(job)
return this
- // Depend on another job to complete
- public afterSuccessOrFailureOf(job: Job): Job {
+ // Depend on another job to complete (does not matter if it is successful)
+ public after(job: Job): Job {
+ if (this.locked) throw new Error('Job is locked')
+ if (job === this) throw new Error('Job Cannot depend on itself')
+ if (job.hasDependencyOn(this)) {
+ throw new Error('Job Circualr dependency')
+ }
return this
- // Allows job to fail (one or more flows failing) without interrupting the scenario
- // The scenario will still result in failure, but allows other jobs and flows to be tested
- public allowFailure(): Job {
- return this
+ public then(job: Job): Job {
+ job.requires(this)
+ return job
+ }
+ public hasDependencyOn(job: Job): boolean {
+ return !!this.required.find((j) => j === job) || !!this.dependencies.find((j) => j === job)
// configure to have flows run serially instead of in parallel
- public serially(): Job {
- return this
+ // public serially(): Job {
+ // return this
+ // }
+ get outcome(): Promise<JobOutcome> {
+ return this._outcome.promise
+ }
+ private async run(flowArgs: FlowArgs): Promise<void> {
+ // prevent any additional changes to configuration
+ this.locked = true
+ // wait for all required dependencies to complete successfully
+ const requiredOutcomes = await Promise.all(this.required.map((job) => job.outcome))
+ if (requiredOutcomes.find((outcome) => outcome !== JobOutcome.Succeeded)) {
+ this.debug('Skipped because required jobs not successful')
+ return this._outcome.resolve(JobOutcome.Skipped)
+ }
+ // Wait for other jobs to complete, irrespective of outcome
+ await Promise.all(this.dependencies.map((job) => job.outcome))
+ this.debug('Running flows')
+ const flowRunResults = await Promise.allSettled(this.flows.map((flow) => flow(flowArgs)))
+ this.debug('Flow run complete')
+ if (flowRunResults.find((result) => result.status === 'rejected')) {
+ this.debug('Failed')
+ this._outcome.resolve(JobOutcome.Failed)
+ } else {
+ this.debug('Succeeded')
+ this._outcome.resolve(JobOutcome.Succeeded)
+ }
-class FlowManager {
+class FlowManager extends EventEmitter {
private readonly flowArgs: FlowArgs
- private pendingJobs: Job[]
- private completedJobs: Job[]
+ private _jobs: Job[] = []
constructor(flowArgs: FlowArgs) {
+ super()
this.flowArgs = flowArgs
- public createJob(flows: Flow[]): Job {
- const job = new Job(this, this.flowArgs, flows)
+ public createJob(label: string, flows: Flow[] | Flow): Job {
+ const arrFlows: Array<Flow> = []
+ const job = new Job(this, arrFlows.concat(flows), label)
- this.pendingJobs.push(job)
+ this._jobs.push(job)
- // TODO: return a limited interface only for configuring job before it runs
return job
- // Run the jobs in parallel where possible, followed by their dependents
public async run(): Promise<void> {
+ this.emit('run', this.flowArgs)
+ const jobOutcomes = await Promise.all(this._jobs.map((job) => job.outcome))
+ const someJobDidNotSucceed = jobOutcomes.find((outcome) => outcome !== JobOutcome.Succeeded)
+ if (someJobDidNotSucceed) {
+ throw new Error('Some jobs failed or skipped')
+ }
@@ -74,7 +155,7 @@ export async function scenario(
query: QueryNodeApi
env: NodeJS.ProcessEnv
debug: Debugger.Debugger
- job: (flows: Flow[]) => Job
+ job: (label: string, flows: Flow[] | Flow) => Job
}) => Promise<void>
): Promise<void> {
// Load env variables
@@ -104,7 +185,12 @@ export async function scenario(
// Does the scenario really need the flow args?
await fn({ api, query, env, debug, job: flowManager.createJob.bind(flowManager) })
- await flowManager.run()
+ try {
+ await flowManager.run()
+ } catch (err) {
+ console.error(err)
+ process.exit(-1)
+ }
// Note: disconnecting and then reconnecting to the chain in the same process
// doesn't seem to work!