Browse Source

storage-node-v2: Fix review comments.

- fix objects by bags caching
- disable auth-token functionality
- remove obsolete flags from the ‘update-bucket-status’
- added logSource to the elastic search
- removed redundant query to the query node (fixed paging)
- changed request error handling
-
Shamil Gadelshin 3 years ago
parent
commit
bcb6542369

+ 0 - 44
storage-node-v2/src/api-spec/openapi.yaml

@@ -97,8 +97,6 @@ paths:
           description: Unknown server error
   /files:
     post:
-      security:
-        - UploadAuth: []
       description: Upload data
       operationId: filesApi.uploadFile
       tags:
@@ -145,43 +143,6 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/ErrorResponse'
-        401:
-          description: Unauthorized
-
-  /authToken:
-    post:
-      description: Get auth token from a server.
-      operationId: filesApi.authTokenForUploading
-      tags:
-        - files
-      requestBody:
-        description: Token request parameters,
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/TokenRequest'
-      responses:
-        201:
-          description: Created
-          content:
-            application/json:
-              schema:
-                type: object
-                properties:
-                  token:
-                    type: string
-        400:
-          description: Bad request
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/ErrorResponse'
-        401:
-          description: Unauthorized
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/ErrorResponse'
 
   /state/data-objects:
     get:
@@ -246,11 +207,6 @@ paths:
                 $ref: '#/components/schemas/DataStatsResponse'
 
 components:
-  securitySchemes:
-    UploadAuth:
-      type: apiKey
-      in: header
-      name: x-api-key
   schemas:
     TokenRequest:
       type: object

+ 0 - 8
storage-node-v2/src/commands/leader/update-bucket-status.ts

@@ -20,14 +20,6 @@ export default class LeaderUpdateStorageBucketStatus extends ApiCommandBase {
       required: true,
       description: 'Storage bucket ID',
     }),
-    enable: flags.boolean({
-      char: 'e',
-      description: 'Enables accepting new bags (default).',
-    }),
-    disable: flags.boolean({
-      char: 'd',
-      description: 'Disables accepting new bags.',
-    }),
     set: flags.enum({
       char: 's',
       description: `Sets 'accepting new bags' parameter for the bucket (on/off).`,

+ 5 - 11
storage-node-v2/src/commands/server.ts

@@ -71,11 +71,6 @@ export default class Server extends ApiCommandBase {
 Log level could be set using the ELASTIC_LOG_LEVEL enviroment variable.
 Supported values: warn, error, debug, info. Default:debug`,
     }),
-    disableUploadAuth: flags.boolean({
-      char: 'a',
-      description: 'Disable uploading authentication (should be used in testing-context only).',
-      default: false,
-    }),
     ...ApiCommandBase.flags,
   }
 
@@ -84,12 +79,14 @@ Supported values: warn, error, debug, info. Default:debug`,
 
     await recreateTempDirectory(flags.uploads, TempDirName)
 
+    const logSource = `StorageProvider_${flags.worker}`
+
     if (fs.existsSync(flags.uploads)) {
       await loadDataObjectIdCache(flags.uploads, TempDirName)
     }
 
     if (!_.isEmpty(flags.elasticSearchEndpoint)) {
-      initElasticLogger(flags.elasticSearchEndpoint ?? '')
+      initElasticLogger(logSource, flags.elasticSearchEndpoint ?? '')
     }
 
     logger.info(`Query node endpoint set: ${flags.queryNodeEndpoint}`)
@@ -98,10 +95,6 @@ Supported values: warn, error, debug, info. Default:debug`,
       await this.ensureDevelopmentChain()
     }
 
-    if (flags.disableUploadAuth) {
-      logger.warn(`Uploading auth-schema disabled.`)
-    }
-
     const api = await this.getApi()
 
     if (flags.sync) {
@@ -141,8 +134,9 @@ Supported values: warn, error, debug, info. Default:debug`,
         tempFileUploadingDir,
         process: this.config,
         queryNodeEndpoint: flags.queryNodeEndpoint,
-        enableUploadingAuth: !flags.disableUploadAuth,
+        enableUploadingAuth: false,
         elasticSearchEndpoint: flags.elasticSearchEndpoint,
+        logSource,
       })
       logger.info(`Listening on http://localhost:${port}`)
       app.listen(port)

+ 13 - 9
storage-node-v2/src/services/logger.ts

@@ -84,12 +84,12 @@ export default proxy
 
 /**
  * Creates Express-Winston logger handler.
- *
+ * @param logSource - source tag for log entries.
  * @param elasticSearchEndpoint - elastic search engine endpoint (optional).
  * @returns  Express-Winston logger handler
  *
  */
-export function httpLogger(elasticSearchEndpoint?: string): Handler {
+export function httpLogger(logSource: string, elasticSearchEndpoint?: string): Handler {
   // ElasticSearch server date format.
   const elasticDateFormat = 'YYYY-MM-DDTHH:mm:ss'
 
@@ -100,7 +100,7 @@ export function httpLogger(elasticSearchEndpoint?: string): Handler {
   ]
 
   if (elasticSearchEndpoint) {
-    const esTransport = createElasticTransport(elasticSearchEndpoint)
+    const esTransport = createElasticTransport(logSource, elasticSearchEndpoint)
     transports.push(esTransport)
   }
 
@@ -147,11 +147,12 @@ export function createStdConsoleLogger(): winston.Logger {
 }
 /**
  * Creates Winston logger with Elastic search.
- *
+ * @param logSource - source tag for log entries.
+ * @param elasticSearchEndpoint - elastic search engine endpoint.
  * @returns Winston logger
  *
  */
-function createElasticLogger(elasticSearchEndpoint: string): winston.Logger {
+function createElasticLogger(logSource: string, elasticSearchEndpoint: string): winston.Logger {
   const loggerOptions = createDefaultLoggerOptions()
 
   // Transports
@@ -160,7 +161,7 @@ function createElasticLogger(elasticSearchEndpoint: string): winston.Logger {
     transports = Array.isArray(loggerOptions.transports) ? loggerOptions.transports : [loggerOptions.transports]
   }
 
-  const esTransport = createElasticTransport(elasticSearchEndpoint)
+  const esTransport = createElasticTransport(logSource, elasticSearchEndpoint)
   transports.push(esTransport)
 
   // Logger
@@ -179,19 +180,21 @@ function createElasticLogger(elasticSearchEndpoint: string): winston.Logger {
 /**
  * Updates the default system logger with elastic search capabilities.
  *
+ * @param logSource - source tag for log entries.
  * @param elasticSearchEndpoint - elastic search engine endpoint.
  */
-export function initElasticLogger(elasticSearchEndpoint: string): void {
-  InnerLogger = createElasticLogger(elasticSearchEndpoint)
+export function initElasticLogger(logSource: string, elasticSearchEndpoint: string): void {
+  InnerLogger = createElasticLogger(logSource, elasticSearchEndpoint)
 }
 
 /**
  * Creates winston logger transport for the elastic search engine.
  *
+ * @param logSource - source tag for log entries.
  * @param elasticSearchEndpoint - elastic search engine endpoint.
  * @returns elastic search winston transport
  */
-function createElasticTransport(elasticSearchEndpoint: string): winston.transport {
+function createElasticTransport(logSource: string, elasticSearchEndpoint: string): winston.transport {
   const possibleLevels = ['warn', 'error', 'debug', 'info']
 
   let elasticLogLevel = process.env.ELASTIC_LOG_LEVEL ?? ''
@@ -206,6 +209,7 @@ function createElasticTransport(elasticSearchEndpoint: string): winston.transpor
     clientOpts: { node: elasticSearchEndpoint, maxRetries: 5 },
     index: 'storage-node',
     format: ecsformat(),
+    source: logSource,
   }
   return new ElasticsearchTransport(esTransportOpts)
 }

+ 18 - 8
storage-node-v2/src/services/sync/storageObligations.ts

@@ -1,5 +1,6 @@
 import { MAX_RESULTS_PER_QUERY, QueryNodeApi } from '../queryNode/api'
 import logger from '../logger'
+import _ from 'lodash'
 import {
   StorageBagDetailsFragment,
   StorageBucketDetailsFragment,
@@ -159,10 +160,15 @@ async function getAllBuckets(api: QueryNodeApi): Promise<StorageBucketDetailsFra
   const idFragments = await api.getStorageBucketIds()
   const ids = idFragments.map((frag) => frag.id)
 
-  return await getAllObjectsWithPaging(
-    'get all storage buckets',
-    async (offset, limit) => await api.getStorageBucketDetails(ids.slice(offset, offset + limit), 0, limit)
-  )
+  return await getAllObjectsWithPaging(async (offset, limit) => {
+    const idsPart = ids.slice(offset, offset + limit)
+    if (!_.isEmpty(idsPart)) {
+      logger.debug(`Sync - getting all storage buckets: offset = ${offset}, limit = ${limit}`)
+      return await api.getStorageBucketDetails(idsPart, 0, limit)
+    } else {
+      return false
+    }
+  })
 }
 
 /**
@@ -196,8 +202,7 @@ async function getAllAssignedBags(api: QueryNodeApi, bucketIds: string[]): Promi
  * @returns storage operator URL
  */
 async function getAllObjectsWithPaging<T>(
-  objectName: string,
-  query: (offset: number, limit: number) => Promise<T[]>
+  query: (offset: number, limit: number) => Promise<T[] | false>
 ): Promise<T[]> {
   const result = []
   const limit = MAX_RESULTS_PER_QUERY
@@ -205,8 +210,13 @@ async function getAllObjectsWithPaging<T>(
 
   let resultPart = []
   do {
-    logger.debug(`Sync - getting ${objectName}: offset = ${offset}, limit = ${limit}`)
-    resultPart = await query(offset, limit)
+    const queryResult = await query(offset, limit)
+    if (queryResult === false) {
+      return result
+    } else {
+      resultPart = queryResult
+    }
+
     offset += limit
     result.push(...resultPart)
   } while (resultPart.length > 0)

+ 1 - 1
storage-node-v2/src/services/webApi/app.ts

@@ -23,7 +23,7 @@ export async function createApp(config: AppConfig): Promise<Express> {
 
   app.use(cors())
   app.use(express.json())
-  app.use(httpLogger(config.elasticSearchEndpoint))
+  app.use(httpLogger(config.logSource, config.elasticSearchEndpoint))
 
   app.use(
     // Set parameters for each request.

+ 14 - 5
storage-node-v2/src/services/webApi/controllers/common.ts

@@ -1,15 +1,13 @@
 import * as express from 'express'
-import { CLIError } from '@oclif/errors'
 import { ExtrinsicFailedError } from '../../runtime/api'
 import { BagIdValidationError } from '../../helpers/bagTypes'
-import logger from '../../logger'
 import { ApiPromise } from '@polkadot/api'
 import { KeyringPair } from '@polkadot/keyring/types'
 
 /**
  * Dedicated error for the web api requests.
  */
-export class WebApiError extends CLIError {
+export class WebApiError extends Error {
   httpStatusCode: number
 
   constructor(err: string, httpStatusCode: number) {
@@ -36,14 +34,20 @@ export class ServerError extends WebApiError {
  * @param errorType - defines request type
  * @returns void promise.
  */
-export function sendResponseWithError(res: express.Response, err: Error, errorType: string): void {
+export function sendResponseWithError(
+  res: express.Response,
+  next: express.NextFunction,
+  err: Error,
+  errorType: string
+): void {
   const message = isNofileError(err) ? `File not found.` : err.toString()
-  logger.error(message, { err })
 
   res.status(getHttpStatusCodeByError(err)).json({
     type: errorType,
     message,
   })
+
+  next(err)
 }
 
 /**
@@ -128,6 +132,11 @@ export type AppConfig = {
    */
   enableUploadingAuth: boolean
 
+  /**
+   * Source tag for log entries for ElasticSearch.
+   */
+  logSource: string
+
   /**
    * ElasticSearch logging endpoint URL
    */

+ 16 - 7
storage-node-v2/src/services/webApi/controllers/filesApi.ts

@@ -29,7 +29,11 @@ const fsPromises = fs.promises
 /**
  * A public endpoint: serves files by data object ID.
  */
-export async function getFile(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
+export async function getFile(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>,
+  next: express.NextFunction
+): Promise<void> {
   try {
     const dataObjectId = getDataObjectId(req)
     const uploadsDir = res.locals.uploadsDir
@@ -48,12 +52,12 @@ export async function getFile(req: express.Request, res: express.Response<unknow
     })
 
     stream.on('error', (err) => {
-      sendResponseWithError(res, err, 'files')
+      sendResponseWithError(res, next, err, 'files')
     })
 
     stream.pipe(res)
   } catch (err) {
-    sendResponseWithError(res, err, 'files')
+    sendResponseWithError(res, next, err, 'files')
   }
 }
 
@@ -81,7 +85,11 @@ export async function getFileHeaders(req: express.Request, res: express.Response
 /**
  * A public endpoint: receives file.
  */
-export async function uploadFile(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
+export async function uploadFile(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>,
+  next: express.NextFunction
+): Promise<void> {
   const uploadRequest: RequestData = req.body
 
   // saved filename to delete on verification or extrinsic errors
@@ -134,7 +142,7 @@ export async function uploadFile(req: express.Request, res: express.Response<unk
   } catch (err) {
     await cleanupFileOnError(cleanupFileName, err.toString())
 
-    sendResponseWithError(res, err, 'upload')
+    sendResponseWithError(res, next, err, 'upload')
   }
 }
 
@@ -143,7 +151,8 @@ export async function uploadFile(req: express.Request, res: express.Response<unk
  */
 export async function authTokenForUploading(
   req: express.Request,
-  res: express.Response<unknown, AppConfig>
+  res: express.Response<unknown, AppConfig>,
+  next: express.NextFunction
 ): Promise<void> {
   try {
     const account = res.locals.storageProviderAccount
@@ -163,7 +172,7 @@ export async function authTokenForUploading(
       token: signedToken,
     })
   } catch (err) {
-    sendResponseWithError(res, err, 'authtoken')
+    sendResponseWithError(res, next, err, 'authtoken')
   }
 }
 

+ 16 - 17
storage-node-v2/src/services/webApi/controllers/stateApi.ts

@@ -23,14 +23,15 @@ const dataCache = new NodeCache({
  */
 export async function getAllLocalDataObjects(
   req: express.Request,
-  res: express.Response<unknown, AppConfig>
+  res: express.Response<unknown, AppConfig>,
+  next: express.NextFunction
 ): Promise<void> {
   try {
     const ids = await getDataObjectIDs()
 
     res.status(200).json(ids)
   } catch (err) {
-    sendResponseWithError(res, err, 'all_data_objects')
+    sendResponseWithError(res, next, err, 'all_data_objects')
   }
 }
 
@@ -41,7 +42,8 @@ export async function getAllLocalDataObjects(
  */
 export async function getLocalDataStats(
   req: express.Request,
-  res: express.Response<unknown, AppConfig>
+  res: express.Response<unknown, AppConfig>,
+  next: express.NextFunction
 ): Promise<void> {
   try {
     const uploadsDir = res.locals.uploadsDir
@@ -78,7 +80,7 @@ export async function getLocalDataStats(
       tempDirSize,
     })
   } catch (err) {
-    sendResponseWithError(res, err, 'local_data_stats')
+    sendResponseWithError(res, next, err, 'local_data_stats')
   }
 }
 
@@ -87,7 +89,8 @@ export async function getLocalDataStats(
  */
 export async function getLocalDataObjectsByBagId(
   req: express.Request,
-  res: express.Response<unknown, AppConfig>
+  res: express.Response<unknown, AppConfig>,
+  next: express.NextFunction
 ): Promise<void> {
   try {
     const queryNodeUrl = res.locals.queryNodeEndpoint
@@ -102,7 +105,7 @@ export async function getLocalDataObjectsByBagId(
 
     res.status(200).json(localDataForBag)
   } catch (err) {
-    sendResponseWithError(res, err, 'data_objects_by_bag')
+    sendResponseWithError(res, next, err, 'data_objects_by_bag')
   }
 }
 
@@ -110,17 +113,13 @@ export async function getLocalDataObjectsByBagId(
  * A public endpoint: return the server version.
  */
 export async function getVersion(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
-  try {
-    const config = res.locals.process
+  const config = res.locals.process
 
-    // Copy from an object, because the actual object could contain more data.
-    res.status(200).json({
-      version: config.version,
-      userAgent: config.userAgent,
-    })
-  } catch (err) {
-    sendResponseWithError(res, err, 'version')
-  }
+  // Copy from an object, because the actual object could contain more data.
+  res.status(200).json({
+    version: config.version,
+    userAgent: config.userAgent,
+  })
 }
 
 /**
@@ -145,7 +144,7 @@ function getBagId(req: express.Request): string {
  *
  */
 async function getCachedDataObjectsObligations(queryNodeUrl: string, bagId: string): Promise<string[]> {
-  const entryName = 'data_object_obligations'
+  const entryName = `data_object_obligations_${bagId}`
 
   if (!dataCache.has(entryName)) {
     const data = await getDataObjectIDsByBagId(queryNodeUrl, bagId)