Kaynağa Gözat

create-bucket-family, set-buckets-per-bag-limit, create-bucket, update-bag

Leszek Wiesner 3 yıl önce
ebeveyn
işleme
56fe75c940

+ 13 - 0
distributor-node/config.yml

@@ -0,0 +1,13 @@
+endpoints:
+  queryNode: http://localhost:8081/graphql
+  substrateNode: ws://localhost:9944
+directories:
+  data: /data
+  cache: /cache
+  logs: /logs
+log:
+  file: debug
+  console: debug
+port: 3334
+keys: [//Alice]
+buckets: [1]

+ 2 - 1
distributor-node/package.json

@@ -92,7 +92,8 @@
     "build": "tsc --build tsconfig.json",
     "lint": "eslint ./src --ext .ts",
     "format": "prettier ./ --write",
-    "checks": "tsc --noEmit --pretty && prettier ./ --check && yarn lint"
+    "checks": "tsc --noEmit --pretty && prettier ./ --check && yarn lint",
+    "cli": "./bin/run"
   },
   "types": "lib/index.d.ts"
 }

+ 11 - 0
distributor-node/scripts/init-bucket.sh

@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+
+SCRIPT_PATH="$(dirname "${BASH_SOURCE[0]}")"
+cd $SCRIPT_PATH
+
+CLI=../bin/run
+CONFIG_PATH="../config.yml"
+${CLI} leader:set-buckets-per-bag-limit -l 10 -c ${CONFIG_PATH} -y
+FAMILY_ID=`${CLI} leader:create-bucket-family -c "${CONFIG_PATH}" -y`
+BUCKET_ID=`${CLI} leader:create-bucket --family "${FAMILY_ID}" --acceptBags -c "${CONFIG_PATH}" -y`
+${CLI} leader:update-bag -b static:council --family "${FAMILY_ID}" --add "${BUCKET_ID}" -c "${CONFIG_PATH}" -y

+ 1 - 1
distributor-node/src/app/index.ts

@@ -19,7 +19,7 @@ export class App {
 
   constructor(config: ReadonlyConfig) {
     this.config = config
-    this.logging = new LoggingService(config)
+    this.logging = LoggingService.withAppConfig(config)
     this.stateCache = new StateCacheService(config, this.logging)
     this.content = new ContentService(config, this.logging, this.stateCache)
     this.networking = new NetworkingService(config, this.stateCache, this.logging)

+ 6 - 0
distributor-node/src/command-base/ExitCodes.ts

@@ -0,0 +1,6 @@
+enum ExitCodes {
+  OK = 0,
+  Error = 1,
+  ApiError = 200,
+}
+export = ExitCodes

+ 57 - 0
distributor-node/src/command-base/accounts.ts

@@ -0,0 +1,57 @@
+import ApiCommandBase from './api'
+import { AccountId } from '@polkadot/types/interfaces'
+import { Keyring } from '@polkadot/api'
+import { KeyringInstance, KeyringOptions, KeyringPair } from '@polkadot/keyring/types'
+import { CLIError } from '@oclif/errors'
+
+export const DEFAULT_ACCOUNT_TYPE = 'sr25519'
+export const KEYRING_OPTIONS: KeyringOptions = {
+  type: DEFAULT_ACCOUNT_TYPE,
+}
+
+/**
+ * Abstract base class for account-related commands.
+ */
+export default abstract class AccountsCommandBase extends ApiCommandBase {
+  private keyring!: KeyringInstance
+
+  isKeyAvailable(key: AccountId | string): boolean {
+    return this.keyring.getPairs().some((p) => p.address === key.toString())
+  }
+
+  getPairs(includeDevAccounts = true): KeyringPair[] {
+    return this.keyring.getPairs().filter((p) => includeDevAccounts || !p.meta.isTesting)
+  }
+
+  getPair(key: string): KeyringPair {
+    const pair = this.keyring.getPair(key)
+    if (!pair) {
+      throw new CLIError(`Required key for account ${key} is not available`)
+    }
+    return pair
+  }
+
+  async getDecodedPair(key: string): Promise<KeyringPair> {
+    // Just for Joystream CLI compatibility currently
+    return this.getPair(key)
+  }
+
+  initKeyring(): void {
+    this.keyring = new Keyring(KEYRING_OPTIONS)
+    this.appConfig.keys.forEach((suri) => this.keyring.addFromUri(suri))
+  }
+
+  async getDistributorLeadKey(): Promise<string> {
+    const currentLead = await this.api.query.distributionWorkingGroup.currentLead()
+    if (!currentLead.isSome) {
+      throw new CLIError('There is no active distributor working group lead currently')
+    }
+    const worker = await this.api.query.distributionWorkingGroup.workerById(currentLead.unwrap())
+    return worker.role_account_id.toString()
+  }
+
+  async init(): Promise<void> {
+    await super.init()
+    await this.initKeyring()
+  }
+}

+ 40 - 0
distributor-node/src/command-base/api.ts

@@ -0,0 +1,40 @@
+import DefaultCommandBase from './default'
+import { CLIError } from '@oclif/errors'
+import { SubmittableResult } from '@polkadot/api'
+import { KeyringPair } from '@polkadot/keyring/types'
+import chalk from 'chalk'
+import { SubmittableExtrinsic } from '@polkadot/api/types'
+import { formatBalance } from '@polkadot/util'
+import { ExtrinsicFailedError, RuntimeApi } from '../services/networking/runtime/api'
+import ExitCodes from './ExitCodes'
+
+/**
+ * Abstract base class for commands that require access to the API.
+ */
+export default abstract class ApiCommandBase extends DefaultCommandBase {
+  protected api!: RuntimeApi
+
+  async init(): Promise<void> {
+    await super.init()
+    this.api = await RuntimeApi.create(this.logging, this.appConfig.endpoints.substrateNode)
+  }
+
+  async sendAndFollowTx(account: KeyringPair, tx: SubmittableExtrinsic<'promise'>): Promise<SubmittableResult> {
+    // Calculate fee and ask for confirmation
+    const fee = await this.api.estimateFee(account, tx)
+
+    await this.requireConfirmation(
+      `Tx fee of ${chalk.cyan(formatBalance(fee))} will be deduced from you account, do you confirm the transfer?`
+    )
+
+    try {
+      const res = await this.api.sendExtrinsic(account, tx)
+      return res
+    } catch (e) {
+      if (e instanceof ExtrinsicFailedError) {
+        throw new CLIError(`Extrinsic failed! ${e.message}`, { exit: ExitCodes.ApiError })
+      }
+      throw e
+    }
+  }
+}

+ 92 - 13
distributor-node/src/command-base/default.ts

@@ -1,18 +1,97 @@
-import Ajv from 'ajv'
-import { JSONSchema4 } from 'json-schema'
-import Command from '@oclif/command'
-import { CLIError } from '@oclif/errors/lib/errors/cli'
+import Command, { flags as oclifFlags } from '@oclif/command'
+import inquirer from 'inquirer'
+import ExitCodes from './ExitCodes'
+import { ReadonlyConfig } from '../types/config'
+import { ConfigParserService } from '../services/parsers/ConfigParserService'
+import { LoggingService } from '../services/logging'
+import { Logger } from 'winston'
+import { BagIdParserService } from '../services/parsers/BagIdParserService'
 
+export const flags = {
+  ...oclifFlags,
+  integerArr: oclifFlags.build({
+    parse: (value: string) => {
+      const arr: number[] = value.split(',').map((v) => {
+        if (!/^-?\d+$/.test(v)) {
+          throw new Error(`Expected comma-separated integers, but received: ${value}`)
+        }
+        return parseInt(v)
+      })
+      return arr
+    },
+  }),
+  bagId: oclifFlags.build({
+    parse: (value: string) => {
+      const parser = new BagIdParserService()
+      return parser.parseBagId(value)
+    },
+    description: `Bag ID. Format: {bag_type}:{sub_type}:{id}.
+    - Bag types: 'static', 'dynamic'
+    - Sub types: 'static:council', 'static:wg', 'dynamic:member', 'dynamic:channel'
+    - Id:
+      - absent for 'static:council'
+      - working group name for 'static:wg'
+      - integer for 'dynamic:member' and 'dynamic:channel'
+    Examples:
+    - static:council
+    - static:wg:storage
+    - dynamic:member:4`,
+  }),
+}
 export default abstract class DefaultCommandBase extends Command {
-  asValidatedInput<ValidInputType>(schema: JSONSchema4, input: unknown, inputName = 'Input'): ValidInputType {
-    const ajv = new Ajv({ allErrors: true })
-    const valid = ajv.validate(schema, input) as boolean
-    if (!valid) {
-      throw new CLIError(
-        `${inputName} is not valid:\n` +
-          ajv.errors?.map((e) => `${e.instancePath}: ${e.message} (${JSON.stringify(e.params)})`).join('\n')
-      )
+  protected appConfig!: ReadonlyConfig
+  protected logging!: LoggingService
+  protected autoConfirm!: boolean
+  private logger!: Logger
+
+  static flags = {
+    yes: flags.boolean({
+      required: false,
+      default: false,
+      description: 'Answer "yes" to any prompt, skipping any manual confirmations',
+      char: 'y',
+    }),
+    configPath: flags.string({
+      required: false,
+      default: './config.yml',
+      description: 'Path to config JSON/YAML file (relative to current working directory)',
+      char: 'c',
+    }),
+  }
+
+  async init(): Promise<void> {
+    const { configPath, yes } = this.parse(this.constructor as typeof DefaultCommandBase).flags
+    const configParser = new ConfigParserService()
+    this.appConfig = configParser.loadConfing(configPath) as ReadonlyConfig
+    this.logging = LoggingService.withCLIConfig()
+    this.logger = this.logging.createLogger('CLI')
+    this.autoConfirm = !!(process.env.AUTO_CONFIRM === 'true' || parseInt(process.env.AUTO_CONFIRM || '') || yes)
+  }
+
+  public log(message: string, meta?: unknown[]): void {
+    this.logger.info(message, meta)
+  }
+
+  public output(value: unknown): void {
+    console.log(value)
+  }
+
+  async requireConfirmation(
+    message = 'Are you sure you want to execute this action?',
+    defaultVal = false
+  ): Promise<void> {
+    if (this.autoConfirm) {
+      return
     }
-    return input as ValidInputType
+    const { confirmed } = await inquirer.prompt([{ type: 'confirm', name: 'confirmed', message, default: defaultVal }])
+    if (!confirmed) {
+      this.exit(ExitCodes.OK)
+    }
+  }
+
+  async finally(err: any): Promise<void> {
+    if (!err) this.exit(ExitCodes.OK)
+    console.error(err)
+    super.finally(err)
   }
 }

+ 93 - 0
distributor-node/src/commands/dev/init.ts

@@ -0,0 +1,93 @@
+import { MemberId } from '@joystream/types/members'
+import AccountsCommandBase from '../../command-base/accounts'
+import DefaultCommandBase from '../../command-base/default'
+
+const ALICE = '5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY'
+
+export default class DevInit extends AccountsCommandBase {
+  static description = 'Initialize development environment. Sets Alice as distributor working group leader.'
+
+  static flags = {
+    ...DefaultCommandBase.flags,
+  }
+
+  async run(): Promise<void> {
+    const { api } = this
+
+    if (!api.isDevelopment) {
+      this.error('Not connected to dev chain!')
+    }
+
+    const SudoKeyPair = this.getPair(ALICE)
+    const LeadKeyPair = this.getPair(ALICE)
+
+    // Create membership if not already created
+    const members = await api.query.members.memberIdsByControllerAccountId(LeadKeyPair.address)
+
+    let memberId: MemberId | undefined = members.toArray()[0]
+
+    if (memberId === undefined) {
+      const res = await this.api.sendExtrinsic(LeadKeyPair, api.tx.members.buyMembership(0, 'alice', null, null))
+      memberId = this.api.getEvent(res, 'members', 'MemberRegistered').data[0]
+    }
+
+    // Create a new lead opening.
+    const currentLead = await api.query.distributionWorkingGroup.currentLead()
+    if (currentLead.isSome) {
+      this.log('Distributor lead already exists, skipping...')
+      return
+    }
+
+    this.log(`Making member id: ${memberId} the content lead.`)
+
+    // Create curator lead opening
+    const addOpeningRes = await this.api.sendExtrinsic(
+      SudoKeyPair,
+      this.api.sudo(
+        api.tx.distributionWorkingGroup.addOpening(
+          { CurrentBlock: null },
+          { max_review_period_length: 9999 },
+          'dev distributor lead opening',
+          'Leader'
+        )
+      )
+    )
+
+    const openingAddedEvent = this.api.getEvent(addOpeningRes, 'distributionWorkingGroup', 'OpeningAdded')
+    const openingId = openingAddedEvent.data[0]
+
+    // Apply to lead opening
+    const applyRes = await this.api.sendExtrinsic(
+      LeadKeyPair,
+      this.api.tx.distributionWorkingGroup.applyOnOpening(
+        memberId, // member id
+        openingId, // opening id
+        LeadKeyPair.address, // address
+        null, // opt role stake
+        null, // opt appl. stake
+        'dev distributor lead application' // human_readable_text
+      )
+    )
+
+    const appliedEvent = this.api.getEvent(applyRes, 'distributionWorkingGroup', 'AppliedOnOpening')
+    const applicationId = appliedEvent.data[1]
+
+    // Begin review period
+    await this.api.sendExtrinsic(
+      SudoKeyPair,
+      this.api.sudo(this.api.tx.distributionWorkingGroup.beginApplicantReview(openingId))
+    )
+
+    // Fill opening
+    await this.api.sendExtrinsic(
+      SudoKeyPair,
+      this.api.sudo(
+        this.api.tx.distributionWorkingGroup.fillOpening(
+          openingId,
+          api.createType('ApplicationIdSet', [applicationId]),
+          null
+        )
+      )
+    )
+  }
+}

+ 25 - 0
distributor-node/src/commands/leader/create-bucket-family.ts

@@ -0,0 +1,25 @@
+import AccountsCommandBase from '../../command-base/accounts'
+import DefaultCommandBase from '../../command-base/default'
+
+export default class LeaderCreateBucketFamily extends AccountsCommandBase {
+  static description = `Create new distribution bucket family. Requires distribution working group leader permissions.`
+
+  static flags = {
+    ...DefaultCommandBase.flags,
+  }
+
+  async run(): Promise<void> {
+    const leadKey = await this.getDistributorLeadKey()
+
+    this.log('Creating new distribution bucket family...')
+    const result = await this.sendAndFollowTx(
+      await this.getDecodedPair(leadKey),
+      this.api.tx.storage.createDistributionBucketFamily()
+    )
+    const event = this.api.getEvent(result, 'storage', 'DistributionBucketFamilyCreated')
+
+    this.log('Bucket family succesfully created!')
+    const bucketFamilyId = event.data[0]
+    this.output(bucketFamilyId.toString())
+  }
+}

+ 37 - 0
distributor-node/src/commands/leader/create-bucket.ts

@@ -0,0 +1,37 @@
+import { flags } from '@oclif/command'
+import AccountsCommandBase from '../../command-base/accounts'
+import DefaultCommandBase from '../../command-base/default'
+
+export default class LeaderCreateBucket extends AccountsCommandBase {
+  static description = `Create new distribution bucket. Requires distribution working group leader permissions.`
+
+  static flags = {
+    family: flags.integer({
+      char: 'f',
+      description: 'Distribution bucket family id',
+      required: true,
+    }),
+    acceptBags: flags.boolean({
+      char: 'a',
+      description: 'Whether the new bucket should accept new bags',
+      default: false,
+    }),
+    ...DefaultCommandBase.flags,
+  }
+
+  async run(): Promise<void> {
+    const { family, acceptBags } = this.parse(LeaderCreateBucket).flags
+    const leadKey = await this.getDistributorLeadKey()
+
+    this.log('Creating new distribution bucket...')
+    const result = await this.sendAndFollowTx(
+      await this.getDecodedPair(leadKey),
+      this.api.tx.storage.createDistributionBucket(family, acceptBags)
+    )
+    const event = this.api.getEvent(result, 'storage', 'DistributionBucketCreated')
+
+    this.log('Bucket succesfully created!')
+    const bucketId = event.data[0]
+    this.output(bucketId.toString())
+  }
+}

+ 28 - 0
distributor-node/src/commands/leader/set-buckets-per-bag-limit.ts

@@ -0,0 +1,28 @@
+import { flags } from '@oclif/command'
+import AccountsCommandBase from '../../command-base/accounts'
+import DefaultCommandBase from '../../command-base/default'
+
+export default class LeaderSetBucketsPerBagLimit extends AccountsCommandBase {
+  static description = `Set max. distribution buckets per bag limit. Requires distribution working group leader permissions.`
+
+  static flags = {
+    limit: flags.integer({
+      char: 'l',
+      description: 'New limit value',
+      required: true,
+    }),
+    ...DefaultCommandBase.flags,
+  }
+
+  async run(): Promise<void> {
+    const { limit } = this.parse(LeaderSetBucketsPerBagLimit).flags
+    const leadKey = await this.getDistributorLeadKey()
+
+    this.log(`Setting new buckets per bag limit (${limit})...`)
+    await this.sendAndFollowTx(
+      await this.getDecodedPair(leadKey),
+      this.api.tx.storage.updateDistributionBucketsPerBagLimit(limit)
+    )
+    this.log('Limit succesfully updated!')
+  }
+}

+ 50 - 0
distributor-node/src/commands/leader/update-bag.ts

@@ -0,0 +1,50 @@
+import AccountsCommandBase from '../../command-base/accounts'
+import DefaultCommandBase, { flags } from '../../command-base/default'
+
+export default class LeaderUpdateBag extends AccountsCommandBase {
+  static description = 'Add/remove distribution buckets from a bag.'
+
+  static flags = {
+    id: flags.bagId({
+      char: 'b',
+      required: true,
+    }),
+    family: flags.integer({
+      char: 'f',
+      description: 'ID of the distribution bucket family',
+      required: true,
+    }),
+    add: flags.integerArr({
+      char: 'a',
+      description: 'IDs of buckets to add to bag',
+      default: [],
+    }),
+    remove: flags.integerArr({
+      char: 'r',
+      description: 'IDs of buckets to remove from bag',
+      default: [],
+    }),
+    ...DefaultCommandBase.flags,
+  }
+
+  async run(): Promise<void> {
+    const { id, family, add, remove } = this.parse(LeaderUpdateBag).flags
+    const leadKey = await this.getDistributorLeadKey()
+
+    this.log(
+      `Updating distribution buckets for bag ${id} (adding: ${add.join(',' || 'NONE')}, removing: ${
+        remove.join(',') || 'NONE'
+      })...`
+    )
+    await this.sendAndFollowTx(
+      await this.getDecodedPair(leadKey),
+      this.api.tx.storage.updateDistributionBucketsForBag(
+        id,
+        family,
+        this.api.createType('DistributionBucketIdSet', add),
+        this.api.createType('DistributionBucketIdSet', remove)
+      )
+    )
+    this.log('Bag succesfully updated!')
+  }
+}

+ 1 - 42
distributor-node/src/commands/start.ts

@@ -1,52 +1,11 @@
-import fs from 'fs'
-import path from 'path'
-import YAML from 'yaml'
-import { CLIError } from '@oclif/errors'
 import DefaultCommandBase from '../command-base/default'
-import { Config, ReadonlyConfig } from '../types/config'
-import { configSchema } from '../validation/schemas'
 import { App } from '../app'
-import _ from 'lodash'
 
 export default class StartNode extends DefaultCommandBase {
   static description = 'Start the node'
 
-  static examples = [`$ joystream-distributor start /path/to/config.yml`]
-
-  // TODO: Allow overriding config through flags
-
-  static args = [
-    {
-      name: 'config',
-      description: 'Path to YAML configuration file',
-      default: './config.yml',
-    },
-  ]
-
-  resolveDirectoryPaths(paths: Config['directories'], configFilePath: string): Config['directories'] {
-    return _.mapValues(paths, (v) => path.resolve(configFilePath, v))
-  }
-
-  getConfing(configPath: string): Config {
-    const fileContent = fs.readFileSync(configPath).toString()
-    let config: unknown
-    if (path.extname(configPath) === '.json') {
-      config = JSON.parse(fileContent)
-    } else if (path.extname(configPath) === '.yml') {
-      config = YAML.parse(fileContent)
-    } else {
-      throw new CLIError('Unrecognized config format (use .yml or .json)')
-    }
-
-    return this.asValidatedInput<Config>(configSchema, config, 'Configuration file')
-  }
-
   async run(): Promise<void> {
-    const { args } = this.parse(StartNode)
-    const configPath = args.config
-    const config = this.getConfing(configPath)
-    config.directories = this.resolveDirectoryPaths(config.directories, configPath)
-    const app = new App(config as ReadonlyConfig)
+    const app = new App(this.appConfig)
     app.start()
   }
 

+ 31 - 2
distributor-node/src/services/logging/LoggingService.ts

@@ -5,7 +5,11 @@ import { ReadonlyConfig } from '../../types'
 export class LoggingService {
   private loggerOptions: LoggerOptions
 
-  public constructor(config: ReadonlyConfig) {
+  private constructor(options: LoggerOptions) {
+    this.loggerOptions = options
+  }
+
+  public static withAppConfig(config: ReadonlyConfig): LoggingService {
     const transports: winston.LoggerOptions['transports'] = [
       new winston.transports.File({
         filename: `${config.directories.logs}/logs.json`,
@@ -19,10 +23,35 @@ export class LoggingService {
         })
       )
     }
-    this.loggerOptions = {
+    return new LoggingService({
       format: escFormat(),
       transports,
+    })
+  }
+
+  public static withCLIConfig(): LoggingService {
+    const colors = {
+      error: 'red',
+      warn: 'yellow',
+      info: 'green',
+      http: 'magenta',
+      debug: 'grey',
     }
+
+    winston.addColors(colors)
+
+    const format = winston.format.combine(
+      winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss:ms' }),
+      winston.format.colorize({ all: true }),
+      winston.format.printf((info) => `${info.timestamp} ${info.label} ${info.level}: ${info.message}`)
+    )
+    return new LoggingService({
+      transports: new winston.transports.Console({
+        // Log everything to stderr, only the command output value will be written to stdout
+        stderrLevels: Object.keys(winston.config.npm.levels),
+      }),
+      format,
+    })
   }
 
   public createLogger(label: string): Logger {

+ 145 - 1
distributor-node/src/services/networking/runtime/api.ts

@@ -1 +1,145 @@
-export class RuntimeApi {}
+import { types } from '@joystream/types/'
+import { ApiPromise, WsProvider, SubmittableResult } from '@polkadot/api'
+import { SubmittableExtrinsic, AugmentedEvent } from '@polkadot/api/types'
+import { KeyringPair } from '@polkadot/keyring/types'
+import { Balance } from '@polkadot/types/interfaces'
+import { formatBalance } from '@polkadot/util'
+import { IEvent } from '@polkadot/types/types'
+import { DispatchError } from '@polkadot/types/interfaces/system'
+import { LoggingService } from '../../logging'
+import { Logger } from 'winston'
+
+export class ExtrinsicFailedError extends Error {}
+
+export class RuntimeApi {
+  private _api: ApiPromise
+  private logger: Logger
+
+  public isDevelopment = false
+
+  private constructor(logging: LoggingService, originalApi: ApiPromise, isDevelopment: boolean) {
+    this.isDevelopment = isDevelopment
+    this.logger = logging.createLogger('SubstrateApi')
+    this._api = originalApi
+  }
+
+  static async create(
+    logging: LoggingService,
+    apiUri: string,
+    metadataCache?: Record<string, any>
+  ): Promise<RuntimeApi> {
+    const { api, chainType } = await RuntimeApi.initApi(apiUri, metadataCache)
+    return new RuntimeApi(logging, api, chainType.isDevelopment || chainType.isLocal)
+  }
+
+  private static async initApi(apiUri: string, metadataCache?: Record<string, any>) {
+    const wsProvider: WsProvider = new WsProvider(apiUri)
+    const api = await ApiPromise.create({ provider: wsProvider, types, metadata: metadataCache })
+
+    // Initializing some api params based on pioneer/packages/react-api/Api.tsx
+    const [properties, chainType] = await Promise.all([api.rpc.system.properties(), api.rpc.system.chainType()])
+
+    const tokenSymbol = properties.tokenSymbol.unwrap()[0].toString()
+    const tokenDecimals = properties.tokenDecimals.unwrap()[0].toNumber()
+
+    // formatBlanace config
+    formatBalance.setDefaults({
+      decimals: tokenDecimals,
+      unit: tokenSymbol,
+    })
+
+    return { api, properties, chainType }
+  }
+
+  public get query(): ApiPromise['query'] {
+    return this._api.query
+  }
+
+  public get tx(): ApiPromise['tx'] {
+    return this._api.tx
+  }
+
+  public get consts(): ApiPromise['consts'] {
+    return this._api.consts
+  }
+
+  public get derive(): ApiPromise['derive'] {
+    return this._api.derive
+  }
+
+  public get createType(): ApiPromise['createType'] {
+    return this._api.createType.bind(this._api)
+  }
+
+  public sudo(tx: SubmittableExtrinsic<'promise'>): SubmittableExtrinsic<'promise'> {
+    return this._api.tx.sudo.sudo(tx)
+  }
+
+  public async estimateFee(account: KeyringPair, tx: SubmittableExtrinsic<'promise'>): Promise<Balance> {
+    const paymentInfo = await tx.paymentInfo(account)
+    return paymentInfo.partialFee
+  }
+
+  public findEvent<
+    S extends keyof ApiPromise['events'] & string,
+    M extends keyof ApiPromise['events'][S] & string,
+    EventType = ApiPromise['events'][S][M] extends AugmentedEvent<'promise', infer T> ? IEvent<T> : never
+  >(result: SubmittableResult, section: S, method: M): EventType | undefined {
+    return result.findRecord(section, method)?.event as EventType | undefined
+  }
+
+  public getEvent<
+    S extends keyof ApiPromise['events'] & string,
+    M extends keyof ApiPromise['events'][S] & string,
+    EventType = ApiPromise['events'][S][M] extends AugmentedEvent<'promise', infer T> ? IEvent<T> : never
+  >(result: SubmittableResult, section: S, method: M): EventType {
+    const event = this.findEvent(result, section, method)
+    if (!event) {
+      throw new Error(`Cannot find expected ${section}.${method} event in result: ${result.toHuman()}`)
+    }
+    return (event as unknown) as EventType
+  }
+
+  sendExtrinsic(keyPair: KeyringPair, tx: SubmittableExtrinsic<'promise'>): Promise<SubmittableResult> {
+    this.logger.info(`Sending ${tx.method.section}.${tx.method.method} extrinsic from ${keyPair.address}`)
+    return new Promise((resolve, reject) => {
+      let unsubscribe: () => void
+      tx.signAndSend(keyPair, {}, (result) => {
+        // Implementation loosely based on /pioneer/packages/react-signer/src/Modal.tsx
+        if (!result || !result.status) {
+          return
+        }
+
+        if (result.status.isInBlock) {
+          unsubscribe()
+          result.events
+            .filter(({ event }) => event.section === 'system')
+            .forEach(({ event }) => {
+              if (event.method === 'ExtrinsicFailed') {
+                const dispatchError = event.data[0] as DispatchError
+                let errorMsg = dispatchError.toString()
+                if (dispatchError.isModule) {
+                  try {
+                    const { name, documentation } = this._api.registry.findMetaError(dispatchError.asModule)
+                    errorMsg = `${name} (${documentation})`
+                  } catch (e) {
+                    // This probably means we don't have this error in the metadata
+                    // In this case - continue (we'll just display dispatchError.toString())
+                  }
+                }
+                reject(new ExtrinsicFailedError(`Extrinsic execution error: ${errorMsg}`))
+              } else if (event.method === 'ExtrinsicSuccess') {
+                resolve(result)
+              }
+            })
+        } else if (result.isError) {
+          reject(new ExtrinsicFailedError('Extrinsic execution error!'))
+        }
+      })
+        .then((unsubFunc) => (unsubscribe = unsubFunc))
+        .catch((e) =>
+          reject(new ExtrinsicFailedError(`Cannot send the extrinsic: ${e.message ? e.message : JSON.stringify(e)}`))
+        )
+    })
+  }
+}

+ 47 - 0
distributor-node/src/services/parsers/BagIdParserService.ts

@@ -0,0 +1,47 @@
+import { BagId } from '@joystream/types/storage'
+import { registry } from '@joystream/types'
+import { createType } from '@polkadot/types'
+import { InterfaceTypes } from '@polkadot/types/types'
+
+export class BagIdParserService {
+  private createType<T extends keyof InterfaceTypes>(type: T, value: any) {
+    return createType(registry, type, value)
+  }
+
+  public parseBagId(bagId: string): BagId {
+    const bagIdParts = bagId.toLowerCase().split(':')
+
+    if (bagIdParts.length > 3 || bagIdParts.length < 2) {
+      throw new Error(`Invalid bagId: ${bagId}`)
+    }
+
+    if (bagIdParts[0] === 'static') {
+      return this.parseStaticBagId(bagId, bagIdParts)
+    }
+
+    if (bagIdParts[0] === 'dynamic') {
+      return this.parseDynamicBagId()
+    }
+
+    throw new Error(`Invalid bagId: ${bagId}`)
+  }
+
+  public parseStaticBagId(bagId: string, bagIdParts: string[]): BagId {
+    if (bagIdParts[1] === 'council') {
+      if (bagIdParts.length === 2) {
+        const staticBagId = this.createType('StaticBagId', 'Council')
+        const constructedBagId = this.createType('BagId', {
+          'Static': staticBagId,
+        })
+
+        return constructedBagId
+      }
+    }
+
+    throw new Error(`Invalid bagId: ${bagId}`)
+  }
+
+  public parseDynamicBagId(): BagId {
+    throw new Error('Function not implemented.')
+  }
+}

+ 35 - 0
distributor-node/src/services/parsers/ConfigParserService.ts

@@ -0,0 +1,35 @@
+import { ValidationService } from '../validation/ValidationService'
+import { Config } from '../../types'
+import fs from 'fs'
+import path from 'path'
+import YAML from 'yaml'
+import _ from 'lodash'
+
+export class ConfigParserService {
+  validator: ValidationService
+
+  constructor() {
+    this.validator = new ValidationService()
+  }
+
+  public resolveConfigDirectoryPaths(paths: Config['directories'], configFilePath: string): Config['directories'] {
+    return _.mapValues(paths, (v) => path.resolve(configFilePath, v))
+  }
+
+  public loadConfing(configPath: string): Config {
+    const fileContent = fs.readFileSync(configPath).toString()
+    let inputConfig: unknown
+    if (path.extname(configPath) === '.json') {
+      inputConfig = JSON.parse(fileContent)
+    } else if (path.extname(configPath) === '.yml' || path.extname(configPath) === '.yaml') {
+      inputConfig = YAML.parse(fileContent)
+    } else {
+      throw new Error('Unrecognized config format (use .yml or .json)')
+    }
+
+    const config = this.validator.validate('Config', inputConfig)
+    config.directories = this.resolveConfigDirectoryPaths(config.directories, configPath)
+
+    return config
+  }
+}

+ 30 - 0
distributor-node/src/services/validation/ValidationService.ts

@@ -0,0 +1,30 @@
+import Ajv from 'ajv'
+import { SchemaKey, schemas, TypeBySchemaKey } from './schemas'
+
+class ValidationError extends Error {
+  public readonly errors: string[]
+
+  public constructor(message: string, errors: string[]) {
+    super(message)
+    this.errors = errors
+  }
+}
+
+export class ValidationService {
+  private ajv: Ajv
+
+  public constructor() {
+    this.ajv = new Ajv({ allErrors: true, schemas })
+  }
+
+  validate(schemaKey: SchemaKey, input: unknown): TypeBySchemaKey<SchemaKey> {
+    const valid = this.ajv.validate(schemaKey, input) as boolean
+    if (!valid) {
+      throw new ValidationError(
+        `${schemaKey} is not valid`,
+        this.ajv.errors?.map((e) => `${e.instancePath}: ${e.message} (${JSON.stringify(e.params)})`) || []
+      )
+    }
+    return input as TypeBySchemaKey<SchemaKey>
+  }
+}

+ 3 - 3
distributor-node/src/validation/generateTypes.ts → distributor-node/src/services/validation/generateTypes.ts

@@ -1,11 +1,11 @@
 import fs from 'fs'
 import path from 'path'
 import { compile } from 'json-schema-to-typescript'
-import { configSchema } from './schemas'
+import { schemas } from './schemas'
 
 // eslint-disable-next-line @typescript-eslint/no-var-requires
 const prettierConfig = require('@joystream/prettier-config')
 
-compile(configSchema, 'ConfigJson', { style: prettierConfig }).then((output) =>
-  fs.writeFileSync(path.resolve(__dirname, '../types/generated/ConfigJson.d.ts'), output)
+compile(schemas.Config, 'ConfigJson', { style: prettierConfig }).then((output) =>
+  fs.writeFileSync(path.resolve(__dirname, '../../types/generated/ConfigJson.d.ts'), output)
 )

+ 0 - 0
distributor-node/src/validation/schemas/configSchema.ts → distributor-node/src/services/validation/schemas/configSchema.ts


+ 12 - 0
distributor-node/src/services/validation/schemas/index.ts

@@ -0,0 +1,12 @@
+import { ConfigJson } from '../../../types/generated/ConfigJson'
+import { configSchema } from './configSchema'
+
+export const schemas = {
+  Config: configSchema,
+} as const
+
+export type SchemaKey = keyof typeof schemas & string
+
+export type TypeBySchemaKey<T extends SchemaKey> = T extends 'Config' ? ConfigJson : never
+
+export default schemas

+ 0 - 0
distributor-node/src/validation/schemas/utils.ts → distributor-node/src/services/validation/schemas/utils.ts


+ 0 - 1
distributor-node/src/validation/schemas/index.ts

@@ -1 +0,0 @@
-export { configSchema } from './configSchema'