Browse Source

storage-node-v2: Fix review comments.

- add workerId filter for buckets
- add AppConfig types for responses
- fix bug with deleting local files
Shamil Gadelshin 3 years ago
parent
commit
2df9a205d5

+ 5 - 4
storage-node-v2/src/commands/server.ts

@@ -117,23 +117,24 @@ Supported values: warn, error, debug, info. Default:debug`,
       )
     }
 
-    const account = this.getAccount(flags)
+    const storageProviderAccount = this.getAccount(flags)
 
-    await verifyWorkerId(api, flags.worker, account)
+    await verifyWorkerId(api, flags.worker, storageProviderAccount)
 
     try {
       const port = flags.port
       const workerId = flags.worker
       const maxFileSize = await api.consts.storage.maxDataObjectSize.toNumber()
+      const tempFileUploadingDir = path.join(flags.uploads, TempDirName)
       logger.debug(`Max file size runtime parameter: ${maxFileSize}`)
 
       const app = await createApp({
         api,
-        account,
+        storageProviderAccount,
         workerId,
         maxFileSize,
         uploadsDir: flags.uploads,
-        tempDirName: TempDirName,
+        tempFileUploadingDir,
         process: this.config,
         queryNodeEndpoint: flags.queryNodeEndpoint,
         enableUploadingAuth: !flags.disableUploadAuth,

+ 35 - 0
storage-node-v2/src/services/caching/newUploads.ts

@@ -0,0 +1,35 @@
+import NodeCache from 'node-cache'
+
+// Expiration period in seconds for the new uploads data.
+const ExpirationPeriod = 3600 // seconds (1 hour)
+
+// Max ID number in local cache
+const MaxEntries = 100000
+
+// Local in-memory cache for new data objects.
+const newDataObjects = new NodeCache({
+  stdTTL: ExpirationPeriod,
+  deleteOnExpire: true,
+  maxKeys: MaxEntries,
+})
+
+/**
+ * Adds a data object ID to the cache for new data objects with expiration time.
+ *
+ * * @param dataObjectId - data object ID.
+ *
+ * @returns nonce string.
+ */
+export function registerNewDataObjectId(dataObjectId: string) {
+  newDataObjects.set(dataObjectId, null, ExpirationPeriod)
+}
+
+/**
+ * Verifies that a data object with provided ID was recently uploaded .
+ *
+ * @param dataObjectId - data object ID.
+ * @returns true if ID was present in local cache.
+ */
+export function isNewDataObject(dataObjectId: string): boolean {
+  return newDataObjects.has(dataObjectId)
+}

+ 0 - 0
storage-node-v2/src/services/helpers/tokenNonceKeeper.ts → storage-node-v2/src/services/caching/tokenNonceKeeper.ts


+ 14 - 2
storage-node-v2/src/services/sync/synchronizer.ts

@@ -56,7 +56,15 @@ export async function performSync(
 
   let addedTasks: SyncTask[]
   if (operatorUrl === undefined) {
-    addedTasks = await getPrepareDownloadTasks(api, model, added, uploadDirectory, tempDirectory, workingStack)
+    addedTasks = await getPrepareDownloadTasks(
+      api,
+      model,
+      workerId,
+      added,
+      uploadDirectory,
+      tempDirectory,
+      workingStack
+    )
   } else {
     addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory, tempDirectory)
   }
@@ -85,6 +93,7 @@ export async function performSync(
 async function getPrepareDownloadTasks(
   api: ApiPromise | undefined,
   dataObligations: DataObligations,
+  currentWorkerId: number,
   addedIds: string[],
   uploadDirectory: string,
   tempDirectory: string,
@@ -97,7 +106,10 @@ async function getPrepareDownloadTasks(
 
   const bucketOperatorUrlById = new Map()
   for (const entry of dataObligations.storageBuckets) {
-    bucketOperatorUrlById.set(entry.id, entry.operatorUrl)
+    // Skip all buckets of the current WorkerId (this storage provider)
+    if (entry.workerId !== currentWorkerId) {
+      bucketOperatorUrlById.set(entry.id, entry.operatorUrl)
+    }
   }
 
   const bagOperatorsUrlsById = new Map()

+ 7 - 0
storage-node-v2/src/services/sync/tasks.ts

@@ -9,6 +9,7 @@ import logger from '../../services/logger'
 import _ from 'lodash'
 import { getRemoteDataObjects } from './remoteStorageData'
 import { TaskSink } from './workingProcess'
+import { isNewDataObject } from '../caching/newUploads'
 import { hashFile } from '../helpers/hashing'
 import { parseBagId } from '../helpers/bagTypes'
 import { hexToString } from '@polkadot/util'
@@ -47,6 +48,12 @@ export class DeleteLocalFileTask implements SyncTask {
   }
 
   async execute(): Promise<void> {
+    const dataObjectId = this.filename
+    if (isNewDataObject(dataObjectId)) {
+      logger.warn('Sync - possible QueryNode update delay (new file) - deleting file canceled: ${this.filename}')
+      return
+    }
+
     const fullPath = path.join(this.uploadsDirectory, this.filename)
     return fsPromises.unlink(fullPath)
   }

+ 6 - 69
storage-node-v2/src/services/webApi/app.ts

@@ -7,65 +7,9 @@ import { HttpError, OpenAPIV3, ValidateSecurityOpts } from 'express-openapi-vali
 import { KeyringPair } from '@polkadot/keyring/types'
 import { ApiPromise } from '@polkadot/api'
 import { RequestData, verifyTokenSignature, parseUploadToken, UploadToken } from '../helpers/auth'
-import { checkRemoveNonce } from '../../services/helpers/tokenNonceKeeper'
+import { checkRemoveNonce } from '../caching/tokenNonceKeeper'
 import { httpLogger, errorLogger } from '../../services/logger'
-
-/**
- * Web application parameters.
- */
-export type AppConfig = {
-  /**
-   * Runtime API promise
-   */
-  api: ApiPromise
-
-  /**
-   * KeyringPair instance
-   */
-  account: KeyringPair
-
-  /**
-   * Storage provider ID (worker ID)
-   */
-  workerId: number
-
-  /**
-   * Directory for the file uploading
-   */
-  uploadsDir: string
-  /**
-   * Directory within the `uploadsDir` for temporary file uploading
-   */
-  tempDirName: string
-
-  /**
-   *  Environment configuration
-   */
-  process: {
-    version: string
-    userAgent: string
-  }
-
-  /**
-   * Query Node endpoint URL
-   */
-  queryNodeEndpoint: string
-
-  /**
-   * Enables uploading auth-schema validation
-   */
-  enableUploadingAuth: boolean
-
-  /**
-   * ElasticSearch logging endpoint URL
-   */
-  elasticSearchEndpoint?: string
-
-  /**
-   * Max file size for uploading limit.
-   */
-  maxFileSize: number
-}
+import { AppConfig } from './controllers/common'
 
 /**
  * Creates Express web application. Uses the OAS spec file for the API.
@@ -75,8 +19,6 @@ export type AppConfig = {
  */
 export async function createApp(config: AppConfig): Promise<Express> {
   const spec = path.join(__dirname, './../../api-spec/openapi.yaml')
-  const tempFileUploadingDir = path.join(config.uploadsDir, config.tempDirName)
-
   const app = express()
 
   app.use(cors())
@@ -86,13 +28,8 @@ export async function createApp(config: AppConfig): Promise<Express> {
   app.use(
     // Set parameters for each request.
     (req: express.Request, res: express.Response, next: NextFunction) => {
-      res.locals.uploadsDir = config.uploadsDir
-      res.locals.tempFileUploadingDir = tempFileUploadingDir
-      res.locals.storageProviderAccount = config.account
-      res.locals.workerId = config.workerId
-      res.locals.api = config.api
-      res.locals.config = config.process
-      res.locals.queryNodeUrl = config.queryNodeEndpoint
+      res.locals = config
+
       next()
     },
     // Setup OpenAPiValidator
@@ -106,7 +43,7 @@ export async function createApp(config: AppConfig): Promise<Express> {
         resolver: OpenApiValidator.resolvers.modulePathResolver,
       },
       fileUploader: {
-        dest: tempFileUploadingDir,
+        dest: config.tempFileUploadingDir,
         // Busboy library settings
         limits: {
           // For multipart forms, the max number of file fields (Default: Infinity)
@@ -115,7 +52,7 @@ export async function createApp(config: AppConfig): Promise<Express> {
           fileSize: config.maxFileSize,
         },
       },
-      validateSecurity: setupUploadingValidation(config.enableUploadingAuth, config.api, config.account),
+      validateSecurity: setupUploadingValidation(config.enableUploadingAuth, config.api, config.storageProviderAccount),
     })
   ) // Required signature.
 

+ 68 - 9
storage-node-v2/src/services/webApi/controllers/common.ts

@@ -3,6 +3,8 @@ import { CLIError } from '@oclif/errors'
 import { ExtrinsicFailedError } from '../../runtime/api'
 import { BagIdValidationError } from '../../helpers/bagTypes'
 import logger from '../../logger'
+import { ApiPromise } from '@polkadot/api'
+import { KeyringPair } from '@polkadot/keyring/types'
 
 /**
  * Dedicated error for the web api requests.
@@ -33,7 +35,7 @@ export class ServerError extends WebApiError {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getUploadsDir(res: express.Response): string {
+export function getUploadsDir(res: express.Response<unknown, AppConfig>): string {
   if (res.locals.uploadsDir) {
     return res.locals.uploadsDir
   }
@@ -48,7 +50,7 @@ export function getUploadsDir(res: express.Response): string {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getTempFileUploadingDir(res: express.Response): string {
+export function getTempFileUploadingDir(res: express.Response<unknown, AppConfig>): string {
   if (res.locals.tempFileUploadingDir) {
     return res.locals.tempFileUploadingDir
   }
@@ -63,7 +65,7 @@ export function getTempFileUploadingDir(res: express.Response): string {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getWorkerId(res: express.Response): number {
+export function getWorkerId(res: express.Response<unknown, AppConfig>): number {
   if (res.locals.workerId || res.locals.workerId === 0) {
     return res.locals.workerId
   }
@@ -78,9 +80,9 @@ export function getWorkerId(res: express.Response): number {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getQueryNodeUrl(res: express.Response): string {
-  if (res.locals.queryNodeUrl) {
-    return res.locals.queryNodeUrl
+export function getQueryNodeUrl(res: express.Response<unknown, AppConfig>): string {
+  if (res.locals.queryNodeEndpoint) {
+    return res.locals.queryNodeEndpoint
   }
 
   throw new ServerError('No Query Node URL loaded.')
@@ -93,12 +95,12 @@ export function getQueryNodeUrl(res: express.Response): string {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getCommandConfig(res: express.Response): {
+export function getCommandConfig(res: express.Response<unknown, AppConfig>): {
   version: string
   userAgent: string
 } {
-  if (res.locals.config) {
-    return res.locals.config
+  if (res.locals.process) {
+    return res.locals.process
   }
 
   throw new ServerError('Cannot load command config.')
@@ -157,3 +159,60 @@ export function getHttpStatusCodeByError(err: Error): number {
 
   return 500
 }
+
+/**
+ * Web application parameters.
+ */
+export type AppConfig = {
+  /**
+   * Runtime API promise
+   */
+  api: ApiPromise
+
+  /**
+   * KeyringPair instance
+   */
+  storageProviderAccount: KeyringPair
+
+  /**
+   * Storage provider ID (worker ID)
+   */
+  workerId: number
+
+  /**
+   * Directory for the file uploading
+   */
+  uploadsDir: string
+  /**
+   * Directory for temporary file uploading
+   */
+  tempFileUploadingDir: string
+
+  /**
+   *  Environment configuration
+   */
+  process: {
+    version: string
+    userAgent: string
+  }
+
+  /**
+   * Query Node endpoint URL
+   */
+  queryNodeEndpoint: string
+
+  /**
+   * Enables uploading auth-schema validation
+   */
+  enableUploadingAuth: boolean
+
+  /**
+   * ElasticSearch logging endpoint URL
+   */
+  elasticSearchEndpoint?: string
+
+  /**
+   * Max file size for uploading limit.
+   */
+  maxFileSize: number
+}

+ 16 - 12
storage-node-v2/src/services/webApi/controllers/filesApi.ts

@@ -7,7 +7,8 @@ import {
   verifyTokenSignature,
 } from '../../helpers/auth'
 import { hashFile } from '../../helpers/hashing'
-import { createNonce, getTokenExpirationTime } from '../../helpers/tokenNonceKeeper'
+import { registerNewDataObjectId } from '../../caching/newUploads'
+import { createNonce, getTokenExpirationTime } from '../../caching/tokenNonceKeeper'
 import { getFileInfo } from '../../helpers/fileInfo'
 import { BagId } from '@joystream/types/storage'
 import logger from '../../logger'
@@ -29,6 +30,7 @@ import {
   getCommandConfig,
   sendResponseWithError,
   getHttpStatusCodeByError,
+  AppConfig,
 } from './common'
 import { getStorageBucketIdsByWorkerId } from '../../sync/storageObligations'
 import { Membership } from '@joystream/types/members'
@@ -37,16 +39,12 @@ const fsPromises = fs.promises
 /**
  * A public endpoint: serves files by data object ID.
  */
-export async function getFile(req: express.Request, res: express.Response): Promise<void> {
+export async function getFile(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const dataObjectId = getDataObjectId(req)
     const uploadsDir = getUploadsDir(res)
     const fullPath = path.resolve(uploadsDir, dataObjectId)
 
-    if (dataObjectId === '1') {
-      throw new Error('Articifial file error')
-    }
-
     const fileInfo = await getFileInfo(fullPath)
     const fileStats = await fsPromises.stat(fullPath)
 
@@ -72,7 +70,7 @@ export async function getFile(req: express.Request, res: express.Response): Prom
 /**
  * A public endpoint: sends file headers by data object ID.
  */
-export async function getFileHeaders(req: express.Request, res: express.Response): Promise<void> {
+export async function getFileHeaders(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const dataObjectId = getDataObjectId(req)
     const uploadsDir = getUploadsDir(res)
@@ -93,7 +91,7 @@ export async function getFileHeaders(req: express.Request, res: express.Response
 /**
  * A public endpoint: receives file.
  */
-export async function uploadFile(req: express.Request, res: express.Response): Promise<void> {
+export async function uploadFile(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   const uploadRequest: RequestData = req.body
 
   // saved filename to delete on verification or extrinsic errors
@@ -118,6 +116,8 @@ export async function uploadFile(req: express.Request, res: express.Response): P
     const uploadsDir = getUploadsDir(res)
     const newPath = path.join(uploadsDir, uploadRequest.dataObjectId.toString())
 
+    registerNewDataObjectId(uploadRequest.dataObjectId.toString())
+
     // Overwrites existing file.
     await fsPromises.rename(fileObj.path, newPath)
     cleanupFileName = newPath
@@ -131,6 +131,7 @@ export async function uploadFile(req: express.Request, res: express.Response): P
         `Received already accepted data object. DataObjectId = ${uploadRequest.dataObjectId} WorkerId = ${workerId}`
       )
     }
+
     res.status(201).json({
       id: hash,
     })
@@ -144,7 +145,10 @@ export async function uploadFile(req: express.Request, res: express.Response): P
 /**
  * A public endpoint: creates auth token for file uploads.
  */
-export async function authTokenForUploading(req: express.Request, res: express.Response): Promise<void> {
+export async function authTokenForUploading(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>
+): Promise<void> {
   try {
     const account = getAccount(res)
     const tokenRequest = getTokenRequest(req)
@@ -194,7 +198,7 @@ function getFileObject(req: express.Request): Express.Multer.File {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-function getAccount(res: express.Response): KeyringPair {
+function getAccount(res: express.Response<unknown, AppConfig>): KeyringPair {
   if (res.locals.storageProviderAccount) {
     return res.locals.storageProviderAccount
   }
@@ -209,7 +213,7 @@ function getAccount(res: express.Response): KeyringPair {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-function getApi(res: express.Response): ApiPromise {
+function getApi(res: express.Response<unknown, AppConfig>): ApiPromise {
   if (res.locals.api) {
     return res.locals.api
   }
@@ -331,7 +335,7 @@ async function cleanupFileOnError(cleanupFileName: string, error: string): Promi
 /**
  * A public endpoint: return the server version.
  */
-export async function getVersion(req: express.Request, res: express.Response): Promise<void> {
+export async function getVersion(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const config = getCommandConfig(res)
 

+ 14 - 4
storage-node-v2/src/services/webApi/controllers/stateApi.ts

@@ -9,6 +9,7 @@ import {
   WebApiError,
   getCommandConfig,
   sendResponseWithError,
+  AppConfig,
 } from './common'
 import fastFolderSize from 'fast-folder-size'
 import { promisify } from 'util'
@@ -28,7 +29,10 @@ const dataCache = new NodeCache({
 /**
  * A public endpoint: return all local data objects.
  */
-export async function getAllLocalDataObjects(req: express.Request, res: express.Response): Promise<void> {
+export async function getAllLocalDataObjects(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>
+): Promise<void> {
   try {
     const uploadsDir = getUploadsDir(res)
     const tempFileDir = getTempFileUploadingDir(res)
@@ -46,7 +50,10 @@ export async function getAllLocalDataObjects(req: express.Request, res: express.
  *
  *  @return total size and count of the data objects.
  */
-export async function getLocalDataStats(req: express.Request, res: express.Response): Promise<void> {
+export async function getLocalDataStats(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>
+): Promise<void> {
   try {
     const uploadsDir = getUploadsDir(res)
     const tempFileDir = getTempFileUploadingDir(res)
@@ -89,7 +96,10 @@ export async function getLocalDataStats(req: express.Request, res: express.Respo
 /**
  * A public endpoint: return local data objects for the bag.
  */
-export async function getLocalDataObjectsByBagId(req: express.Request, res: express.Response): Promise<void> {
+export async function getLocalDataObjectsByBagId(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>
+): Promise<void> {
   try {
     const uploadsDir = getUploadsDir(res)
     const tempFileDir = getTempFileUploadingDir(res)
@@ -113,7 +123,7 @@ export async function getLocalDataObjectsByBagId(req: express.Request, res: expr
 /**
  * A public endpoint: return the server version.
  */
-export async function getVersion(req: express.Request, res: express.Response): Promise<void> {
+export async function getVersion(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const config = getCommandConfig(res)