Leszek Wiesner преди 3 години
родител
ревизия
e4d8062531

+ 0 - 1
distributor-node/config.yml

@@ -37,5 +37,4 @@ keys:
   # - mnemonic: "escape naive annual throw tragic achieve grunt verify cram note harvest problem"
   #   type: ed25519
   # - keyfile: "/path/to/keyfile.json"
-buckets: 'all'
 workerId: 0

+ 7 - 11
distributor-node/src/api-spec/operator.yml

@@ -104,19 +104,15 @@ components:
           minimum: 0
     SetBucketsOperation:
       type: object
-      required:
-        - buckets
       properties:
         buckets:
-          oneOf:
-            - type: string
-              enum: ['all']
-              description: All buckets assigned to configured workerId.
-            - type: array
-              minItems: 1
-              items:
-                type: integer
-                minimum: 0
+          description: "Set of bucket ids to be distributed by the node.
+            If not provided - all buckets assigned to currently configured worker will be distributed."
+          type: array
+          minItems: 1
+          items:
+            type: integer
+            minimum: 0
 
 security:
   - OperatorAuth: []

+ 22 - 22
distributor-node/src/app/index.ts

@@ -52,32 +52,32 @@ export class App {
     }
   }
 
-  private checkConfigDirectories(): void {
-    Object.entries(this.config.directories).forEach(([name, path]) => {
-      if (path === undefined) {
-        return
-      }
-      const dirInfo = `${name} directory (${path})`
-      if (!fs.existsSync(path)) {
-        try {
-          fs.mkdirSync(path, { recursive: true })
-        } catch (e) {
-          throw new Error(`${dirInfo} doesn't exist and cannot be created!`)
-        }
-      }
+  private checkConfigDir(name: string, path: string): void {
+    const dirInfo = `${name} directory (${path})`
+    if (!fs.existsSync(path)) {
       try {
-        fs.accessSync(path, fs.constants.R_OK)
+        fs.mkdirSync(path, { recursive: true })
       } catch (e) {
-        throw new Error(`${dirInfo} is not readable`)
+        throw new Error(`${dirInfo} doesn't exist and cannot be created!`)
       }
-      try {
-        fs.accessSync(path, fs.constants.W_OK)
-      } catch (e) {
-        throw new Error(`${dirInfo} is not writable`)
-      }
-    })
+    }
+    try {
+      fs.accessSync(path, fs.constants.R_OK)
+    } catch (e) {
+      throw new Error(`${dirInfo} is not readable`)
+    }
+    try {
+      fs.accessSync(path, fs.constants.W_OK)
+    } catch (e) {
+      throw new Error(`${dirInfo} is not writable`)
+    }
+  }
 
-    // TODO: Logging dir if specified
+  private checkConfigDirectories(): void {
+    Object.entries(this.config.directories).forEach(([name, path]) => this.checkConfigDir(name, path))
+    if (this.config.logs?.file) {
+      this.checkConfigDir('logs.file.path', this.config.logs.file.path)
+    }
   }
 
   public async start(): Promise<void> {

+ 1 - 1
distributor-node/src/command-base/accounts.ts

@@ -124,7 +124,7 @@ export default abstract class AccountsCommandBase extends ApiCommandBase {
 
   initKeyring(): void {
     this.keyring = new Keyring(KEYRING_OPTIONS)
-    this.appConfig.keys.forEach((keyData) => {
+    this.appConfig.keys?.forEach((keyData) => {
       if ('suri' in keyData) {
         this.keyring.addFromUri(keyData.suri, undefined, keyData.type)
       }

+ 5 - 3
distributor-node/src/commands/node/set-buckets.ts

@@ -32,8 +32,10 @@ export default class NodeSetBucketsCommand extends NodeCommandBase {
     if (!all && !bucketIds) {
       this.error('You must provide either --bucketIds or --all flag!', { exit: ExitCodes.InvalidInput })
     }
-    return {
-      buckets: all ? 'all' : bucketIds,
-    }
+    return all
+      ? {}
+      : {
+          buckets: bucketIds,
+        }
   }
 }

+ 56 - 86
distributor-node/src/schemas/configSchema.ts

@@ -12,17 +12,11 @@ const logLevelSchema: JSONSchema4 = {
   enum: [...Object.keys(winston.config.npm.levels)],
 }
 
-const offSwitch: JSONSchema4 = {
-  title: 'Switch off',
-  type: 'string',
-  enum: ['off'],
-}
-
 export const configSchema: JSONSchema4 = objectSchema({
   '$id': 'https://joystream.org/schemas/argus/config',
   title: 'Distributor node configuration',
   description: 'Configuration schema for distirubtor CLI and node',
-  required: ['id', 'endpoints', 'directories', 'buckets', 'keys', 'workerId', 'limits', 'intervals', 'publicApi'],
+  required: ['id', 'endpoints', 'directories', 'limits', 'intervals', 'publicApi'],
   properties: {
     id: {
       type: 'string',
@@ -61,69 +55,54 @@ export const configSchema: JSONSchema4 = objectSchema({
     logs: objectSchema({
       description: 'Specifies the logging configuration',
       properties: {
-        file: {
-          oneOf: [
-            objectSchema({
-              title: 'File logging options',
-              properties: {
-                level: logLevelSchema,
-                path: {
-                  description: 'Path where the logs will be stored (absolute or relative to config file)',
-                  type: 'string',
-                },
-                maxFiles: {
-                  description: 'Maximum number of log files to store',
-                  type: 'integer',
-                  minimum: 1,
-                },
-                maxSize: {
-                  description: 'Maximum size of a single log file in bytes',
-                  type: 'integer',
-                  minimum: 1024,
-                },
-                frequency: {
-                  description: 'The frequency of creating new log files (regardless of maxSize)',
-                  default: 'daily',
-                  type: 'string',
-                  enum: ['yearly', 'monthly', 'daily', 'hourly'],
-                },
-                archive: {
-                  description: 'Whether to archive old logs',
-                  default: false,
-                  type: 'boolean',
-                },
-              },
-              required: ['level', 'path'],
-            }),
-            offSwitch,
-          ],
-        },
-        console: {
-          oneOf: [
-            objectSchema({
-              title: 'Console logging options',
-              properties: { level: logLevelSchema },
-              required: ['level'],
-            }),
-            offSwitch,
-          ],
-        },
-        elastic: {
-          oneOf: [
-            objectSchema({
-              title: 'Elasticsearch logging options',
-              properties: {
-                level: logLevelSchema,
-                endpoint: {
-                  description: 'Elastichsearch endpoint to push the logs to (for example: http://localhost:9200)',
-                  type: 'string',
-                },
-              },
-              required: ['level', 'endpoint'],
-            }),
-            offSwitch,
-          ],
-        },
+        file: objectSchema({
+          title: 'File logging options',
+          properties: {
+            level: logLevelSchema,
+            path: {
+              description: 'Path where the logs will be stored (absolute or relative to config file)',
+              type: 'string',
+            },
+            maxFiles: {
+              description: 'Maximum number of log files to store',
+              type: 'integer',
+              minimum: 1,
+            },
+            maxSize: {
+              description: 'Maximum size of a single log file in bytes',
+              type: 'integer',
+              minimum: 1024,
+            },
+            frequency: {
+              description: 'The frequency of creating new log files (regardless of maxSize)',
+              default: 'daily',
+              type: 'string',
+              enum: ['yearly', 'monthly', 'daily', 'hourly'],
+            },
+            archive: {
+              description: 'Whether to archive old logs',
+              default: false,
+              type: 'boolean',
+            },
+          },
+          required: ['level', 'path'],
+        }),
+        console: objectSchema({
+          title: 'Console logging options',
+          properties: { level: logLevelSchema },
+          required: ['level'],
+        }),
+        elastic: objectSchema({
+          title: 'Elasticsearch logging options',
+          properties: {
+            level: logLevelSchema,
+            endpoint: {
+              description: 'Elastichsearch endpoint to push the logs to (for example: http://localhost:9200)',
+              type: 'string',
+            },
+          },
+          required: ['level', 'endpoint'],
+        }),
       },
       required: [],
     }),
@@ -256,22 +235,13 @@ export const configSchema: JSONSchema4 = objectSchema({
       minItems: 1,
     },
     buckets: {
-      description: 'Specifies the buckets distributed by the node',
-      oneOf: [
-        {
-          title: 'Bucket ids',
-          description: 'List of distribution bucket ids',
-          type: 'array',
-          items: { type: 'integer', minimum: 0 },
-          minItems: 1,
-        },
-        {
-          title: 'All buckets',
-          description: 'Distribute all buckets assigned to worker specified in `workerId`',
-          type: 'string',
-          enum: ['all'],
-        },
-      ],
+      description:
+        'Set of bucket ids distributed by the node. If not specified, all buckets currently assigned to worker specified in `config.workerId` will be distributed.',
+      title: 'Bucket ids',
+      type: 'array',
+      uniqueItems: true,
+      items: { type: 'integer', minimum: 0 },
+      minItems: 1,
     },
     workerId: {
       description: 'ID of the node operator (distribution working group worker)',

+ 7 - 4
distributor-node/src/services/httpApi/controllers/operator.ts

@@ -66,10 +66,13 @@ export class OperatorApiController {
     res: express.Response
   ): Promise<void> {
     const { buckets } = req.body
-    this.logger.info(`Updating buckets to ${JSON.stringify(buckets)} on operator request from ${req.ip}`, {
-      buckets,
-      ip: req.ip,
-    })
+    this.logger.info(
+      `Updating buckets to ${buckets ? JSON.stringify(buckets) : '"all"'} on operator request from ${req.ip}`,
+      {
+        buckets,
+        ip: req.ip,
+      }
+    )
     this.config.buckets = buckets
     res.status(200).send()
   }

+ 4 - 2
distributor-node/src/services/httpApi/controllers/public.ts

@@ -294,9 +294,11 @@ export class PublicApiController {
     res
       .status(200)
       .json(
-        this.config.buckets === 'all'
+        this.config.buckets
+          ? { bucketIds: [...this.config.buckets] }
+          : this.config.workerId
           ? { allByWorkerId: this.config.workerId }
-          : { bucketIds: [...this.config.buckets] }
+          : { bucketIds: [] }
       )
   }
 }

+ 3 - 3
distributor-node/src/services/logging/LoggingService.ts

@@ -76,7 +76,7 @@ export class LoggingService {
     const transports: winston.LoggerOptions['transports'] = []
 
     let esTransport: ElasticsearchTransport | undefined
-    if (config.logs?.elastic && config.logs.elastic !== 'off') {
+    if (config.logs?.elastic) {
       esTransport = new ElasticsearchTransport({
         index: 'distributor-node',
         level: config.logs.elastic.level,
@@ -92,7 +92,7 @@ export class LoggingService {
       transports.push(esTransport)
     }
 
-    if (config.logs?.file && config.logs.file !== 'off') {
+    if (config.logs?.file) {
       const datePatternByFrequency = {
         yearly: 'YYYY',
         monthly: 'YYYY-MM',
@@ -111,7 +111,7 @@ export class LoggingService {
       transports.push(fileTransport)
     }
 
-    if (config.logs?.console && config.logs.console !== 'off') {
+    if (config.logs?.console) {
       const consoleTransport = new winston.transports.Console({
         level: config.logs.console.level,
         format: winston.format.combine(pauseFormat({ id: 'cli' }), cliFormat),

+ 7 - 6
distributor-node/src/services/networking/NetworkingService.ts

@@ -135,9 +135,9 @@ export class NetworkingService {
     let data: DataObjectData | undefined
     if (details) {
       exists = true
-      if (this.config.buckets === 'all') {
+      if (!this.config.buckets) {
         const distributors = this.getDataObjectActiveDistributorsSet(details)
-        isSupported = distributors.has(this.config.workerId)
+        isSupported = this.config.workerId ? distributors.has(this.config.workerId) : false
       } else {
         const supportedBucketIds = this.config.buckets.map((id) => id.toString())
         isSupported = details.storageBag.distirbutionAssignments.some((a) =>
@@ -358,10 +358,11 @@ export class NetworkingService {
   }
 
   async fetchSupportedDataObjects(): Promise<Map<string, DataObjectData>> {
-    const data =
-      this.config.buckets === 'all'
-        ? await this.queryNodeApi.getDistributionBucketsWithObjectsByWorkerId(this.config.workerId)
-        : await this.queryNodeApi.getDistributionBucketsWithObjectsByIds(this.config.buckets.map((id) => id.toString()))
+    const data = this.config.buckets
+      ? await this.queryNodeApi.getDistributionBucketsWithObjectsByIds(this.config.buckets.map((id) => id.toString()))
+      : this.config.workerId
+      ? await this.queryNodeApi.getDistributionBucketsWithObjectsByWorkerId(this.config.workerId)
+      : []
     const objectsData = new Map<string, DataObjectData>()
     data.forEach((bucket) => {
       bucket.bagAssignments.forEach((a) => {

+ 29 - 25
distributor-node/src/services/parsers/ConfigParserService.ts

@@ -28,7 +28,7 @@ export class ConfigParserService {
   }
 
   public resolveConfigKeysPaths(keys: Config['keys']): Config['keys'] {
-    return keys.map((k) => ('keyfile' in k ? { keyfile: this.resolvePath(k.keyfile) } : k))
+    return keys?.map((k) => ('keyfile' in k ? { keyfile: this.resolvePath(k.keyfile) } : k))
   }
 
   private parseBytesize(bytesize: string) {
@@ -71,30 +71,34 @@ export class ConfigParserService {
 
     for (const i in possibleTypes) {
       try {
-        switch (possibleTypes[i]) {
-          case undefined:
-            // Invalid key - skip
-            break
-          case 'integer':
-            _.set(config, path, parseInt(envValue || ''))
-            break
-          case 'number':
-            _.set(config, path, parseFloat(envValue || ''))
-            break
-          case 'boolean':
-            _.set(config, path, !!envValue)
-            break
-          case 'array':
-          case 'object':
-            try {
-              const parsed = JSON.parse(envValue || 'undefined')
-              _.set(config, path, parsed)
-            } catch (e) {
-              throw new ValidationError(`Invalid env value of ${envKey}: Not a valid JSON`, null)
-            }
-            break
-          default:
-            _.set(config, path, envValue)
+        if (envValue === 'off' || envValue === 'null' || envValue === 'undefined') {
+          _.set(config, path, undefined)
+        } else {
+          switch (possibleTypes[i]) {
+            case undefined:
+              // Invalid key - skip
+              break
+            case 'integer':
+              _.set(config, path, parseInt(envValue || ''))
+              break
+            case 'number':
+              _.set(config, path, parseFloat(envValue || ''))
+              break
+            case 'boolean':
+              _.set(config, path, !!envValue)
+              break
+            case 'array':
+            case 'object':
+              try {
+                const parsed = JSON.parse(envValue || 'undefined')
+                _.set(config, path, parsed)
+              } catch (e) {
+                throw new ValidationError(`Invalid env value of ${envKey}: Not a valid JSON`, null)
+              }
+              break
+            default:
+              _.set(config, path, envValue)
+          }
         }
         const errors = this.validator.errorsByProperty('Config', path.join('.'), config)
         if (errors) {

+ 7 - 15
distributor-node/src/types/generated/ConfigJson.d.ts

@@ -5,15 +5,10 @@
  * and run json-schema-to-typescript to regenerate this file.
  */
 
-export type SwitchOff = 'off'
 /**
- * List of distribution bucket ids
+ * Set of bucket ids distributed by the node. If not specified, all buckets currently assigned to worker specified in `config.workerId` will be distributed.
  */
 export type BucketIds = number[]
-/**
- * Distribute all buckets assigned to worker specified in `workerId`
- */
-export type AllBuckets = 'all'
 
 /**
  * Configuration schema for distirubtor CLI and node
@@ -53,9 +48,9 @@ export interface DistributorNodeConfiguration {
    * Specifies the logging configuration
    */
   logs?: {
-    file?: FileLoggingOptions | SwitchOff
-    console?: ConsoleLoggingOptions | SwitchOff
-    elastic?: ElasticsearchLoggingOptions | SwitchOff
+    file?: FileLoggingOptions
+    console?: ConsoleLoggingOptions
+    elastic?: ElasticsearchLoggingOptions
   }
   /**
    * Specifies node limits w.r.t. storage, outbound connections etc.
@@ -132,15 +127,12 @@ export interface DistributorNodeConfiguration {
   /**
    * Specifies the keys available within distributor node CLI.
    */
-  keys: (SubstrateUri | MnemonicPhrase | JSONBackupFile)[]
-  /**
-   * Specifies the buckets distributed by the node
-   */
-  buckets: BucketIds | AllBuckets
+  keys?: (SubstrateUri | MnemonicPhrase | JSONBackupFile)[]
+  buckets?: BucketIds
   /**
    * ID of the node operator (distribution working group worker)
    */
-  workerId: number
+  workerId?: number
 }
 export interface FileLoggingOptions {
   /**

+ 2 - 1
distributor-node/src/types/generated/OperatorApi.ts

@@ -32,7 +32,8 @@ export interface components {
       'workerId': number
     }
     'SetBucketsOperation': {
-      'buckets': 'all' | number[]
+      /** Set of bucket ids to be distributed by the node. If not provided - all buckets assigned to currently configured worker will be distributed. */
+      'buckets'?: number[]
     }
   }
 }

+ 2 - 1
docker-compose.yml

@@ -70,7 +70,8 @@ services:
     #   JOYSTREAM_DISTRIBUTOR__DIRECTORIES__ASSETS: assets-dir
     #   JOYSTREAM_DISTRIBUTOR__DIRECTORIES__CACHE_STATE: cache-state-dir
     #   JOYSTREAM_DISTRIBUTOR__LOGS__CONSOLE: "off"
-    #   JOYSTREAM_DISTRIBUTOR__LOGS__FILE: "{\"level\":\"debug\",\"path\":\"/tmp\"}"
+    #   JOYSTREAM_DISTRIBUTOR__LOGS__FILE__LEVEL: debug
+    #   JOYSTREAM_DISTRIBUTOR__LOGS__FILE__PATH: /tmp
     #   JOYSTREAM_DISTRIBUTOR__LOGS__ELASTIC: "off"
     #   JOYSTREAM_DISTRIBUTOR__LIMITS__STORAGE: 50G
     #   JOYSTREAM_DISTRIBUTOR__PORT: 1234