Quellcode durchsuchen

storage-node-v2: Fix PR comments.

- add syncWorkersTimeout flag
- remove worker ID verification
- remove transaction nonce cache
- removed unnecessary locks
Shamil Gadelshin vor 3 Jahren
Ursprung
Commit
bf88cf1d16

+ 8 - 3
storage-node-v2/src/commands/dev/sync.ts

@@ -29,6 +29,12 @@ export default class DevSync extends Command {
       description: 'Sync workers number (max async operations in progress).',
       default: 20,
     }),
+    syncWorkersTimeout: flags.integer({
+      char: 't',
+      required: false,
+      description: 'Asset downloading timeout for the syncronization (in minutes).',
+      default: 30,
+    }),
     queryNodeEndpoint: flags.string({
       char: 'q',
       required: false,
@@ -53,13 +59,12 @@ export default class DevSync extends Command {
 
     logger.info('Syncing...')
 
-    const syncWorkersNumber = flags.syncWorkersNumber
-
     try {
       await performSync(
         undefined,
         flags.workerId,
-        syncWorkersNumber,
+        flags.syncWorkersNumber,
+        flags.syncWorkersTimeout,
         flags.queryNodeEndpoint,
         flags.uploads,
         flags.dataSourceOperatorUrl

+ 19 - 26
storage-node-v2/src/commands/server.ts

@@ -10,9 +10,7 @@ import rimraf from 'rimraf'
 import _ from 'lodash'
 import path from 'path'
 import { promisify } from 'util'
-import { KeyringPair } from '@polkadot/keyring/types'
 import ExitCodes from './../command-base/ExitCodes'
-import { CLIError } from '@oclif/errors'
 import fs from 'fs'
 const fsPromises = fs.promises
 
@@ -64,6 +62,12 @@ export default class Server extends ApiCommandBase {
       description: 'Sync workers number (max async operations in progress).',
       default: 20,
     }),
+    syncWorkersTimeout: flags.integer({
+      char: 't',
+      required: false,
+      description: 'Asset downloading timeout for the syncronization (in minutes).',
+      default: 30,
+    }),
     elasticSearchEndpoint: flags.string({
       char: 'e',
       required: false,
@@ -108,6 +112,7 @@ Supported values: warn, error, debug, info. Default:debug`,
             flags.uploads,
             TempDirName,
             flags.syncWorkersNumber,
+            flags.syncWorkersTimeout,
             flags.syncInterval
           ),
         0
@@ -116,8 +121,6 @@ Supported values: warn, error, debug, info. Default:debug`,
 
     const storageProviderAccount = this.getAccount(flags)
 
-    await verifyWorkerId(api, flags.worker, storageProviderAccount)
-
     try {
       const port = flags.port
       const workerId = flags.worker
@@ -159,6 +162,7 @@ Supported values: warn, error, debug, info. Default:debug`,
  * @param uploadsDir - data uploading directory
  * @param tempDirectory - temporary data uploading directory
  * @param syncWorkersNumber - defines a number of the async processes for sync
+ * @param syncWorkersTimeout - downloading asset timeout
  * @param syncIntervalMinutes - defines an interval between sync runs
  *
  * @returns void promise.
@@ -170,15 +174,24 @@ async function runSyncWithInterval(
   uploadsDirectory: string,
   tempDirectory: string,
   syncWorkersNumber: number,
+  syncWorkersTimeout: number,
   syncIntervalMinutes: number
 ) {
-  const sleepInteval = syncIntervalMinutes * 60 * 1000
+  const sleepInteval = syncIntervalMinutes * 60 * 100
   while (true) {
     logger.info(`Sync paused for ${syncIntervalMinutes} minute(s).`)
     await sleep(sleepInteval)
     try {
       logger.info(`Resume syncing....`)
-      await performSync(api, workerId, syncWorkersNumber, queryNodeUrl, uploadsDirectory, tempDirectory)
+      await performSync(
+        api,
+        workerId,
+        syncWorkersNumber,
+        syncWorkersTimeout,
+        queryNodeUrl,
+        uploadsDirectory,
+        tempDirectory
+      )
     } catch (err) {
       logger.error(`Critical sync error: ${err}`)
     }
@@ -207,23 +220,3 @@ async function recreateTempDirectory(uploadsDirectory: string, tempDirName: stri
     logger.error(`Temp directory IO error: ${err}`)
   }
 }
-
-/**
- * Verifies the worker ID from the command line argument and provided Joystream account.
- * It throws an error when not matched.
- *
- * @param api - runtime API promise
- * @param workerId - worker ID from the command line arguments
- * @param account - Joystream account KeyringPair
- * @returns void promise.
- */
-async function verifyWorkerId(api: ApiPromise, workerId: number, account: KeyringPair): Promise<void> {
-  // Cast Codec type to Worker type
-  const worker = await api.query.storageWorkingGroup.workerById(workerId)
-
-  if (worker.role_account_id.toString() !== account.address) {
-    throw new CLIError(`Provided worker ID doesn't match the Joystream account.`, {
-      exit: ExitCodes.InvalidWorkerId,
-    })
-  }
-}

+ 0 - 61
storage-node-v2/src/services/caching/transactionNonceKeeper.ts

@@ -1,61 +0,0 @@
-import { KeyringPair } from '@polkadot/keyring/types'
-import type { Index } from '@polkadot/types/interfaces/runtime'
-import BN from 'bn.js'
-import AwaitLock from 'await-lock'
-import { ApiPromise } from '@polkadot/api'
-import logger from '../logger'
-import NodeCache from 'node-cache'
-
-// Expiration period in seconds for the nonce cache.
-const NonceExpirationPeriod = 180 // seconds
-
-// Local in-memory cache for nonces.
-const nonceCache = new NodeCache({
-  stdTTL: NonceExpirationPeriod,
-  deleteOnExpire: true,
-})
-
-const nonceEntryName = 'transaction_nonce'
-const lock = new AwaitLock()
-
-/**
- * Return the current transaction nonce for an account from the runtime.
- *
- * @param api - runtime API promise
- * @param account - KeyPair instance
- * @returns promise with transaction nonce for a given account.
- *
- */
-export async function getTransactionNonce(api: ApiPromise, account: KeyringPair): Promise<Index> {
-  await lock.acquireAsync()
-  try {
-    let nonce: Index | undefined = nonceCache.get(nonceEntryName)
-    if (nonce === undefined) {
-      nonce = await api.rpc.system.accountNextIndex(account.address)
-    } else {
-      nonce = nonce.add(new BN(1)) as Index
-    }
-
-    nonceCache.set(nonceEntryName, nonce)
-
-    logger.debug(`Last transaction nonce:${nonce}`)
-    return nonce as Index
-  } finally {
-    lock.release()
-  }
-}
-
-/**
- * Drops the transaction nonce cache.
- *
- * @returns empty promise.
- *
- */
-export async function resetTransactionNonceCache(): Promise<void> {
-  await lock.acquireAsync()
-  nonceCache.del(nonceEntryName)
-
-  logger.debug(`Transaction node cache was dropped.`)
-
-  lock.release()
-}

+ 69 - 24
storage-node-v2/src/services/runtime/api.ts

@@ -6,11 +6,12 @@ import { TypeRegistry } from '@polkadot/types'
 import { KeyringPair } from '@polkadot/keyring/types'
 import { SubmittableExtrinsic, AugmentedEvent } from '@polkadot/api/types'
 import { DispatchError, DispatchResult } from '@polkadot/types/interfaces/system'
-import { getTransactionNonce, resetTransactionNonceCache } from '../caching/transactionNonceKeeper'
 import logger from '../../services/logger'
 import ExitCodes from '../../command-base/ExitCodes'
 import { CLIError } from '@oclif/errors'
 import stringify from 'fast-safe-stringify'
+import sleep from 'sleep-promise'
+import AwaitLock from 'await-lock'
 
 /**
  * Dedicated error for the failed extrinsics.
@@ -29,27 +30,57 @@ export async function createApi(apiUrl: string): Promise<ApiPromise> {
 
   const api = new ApiPromise({ provider, types })
   await api.isReadyOrError
+  await untilChainIsSynced(api)
 
   api.on('error', (err) => logger.error(`Api promise error: ${err.target?._url}`, { err }))
 
   return api
 }
 
+/**
+ * Awaits the chain to be fully synchronized.
+ */
+async function untilChainIsSynced(api: ApiPromise) {
+  logger.info('Waiting for chain to be synced before proceeding.')
+  while (true) {
+    const isSyncing = await chainIsSyncing(api)
+    if (isSyncing) {
+      logger.info('Still waiting for chain to be synced.')
+      await sleep(1 * 30 * 1000)
+    } else {
+      return
+    }
+  }
+}
+
+/**
+ * Checks the chain sync status.
+ *
+ * @param api api promise
+ * @returns
+ */
+async function chainIsSyncing(api: ApiPromise) {
+  const { isSyncing } = await api.rpc.system.health()
+  return isSyncing.isTrue
+}
+
+const lock = new AwaitLock()
+
 /**
  * Sends an extrinsic to the runtime and follows the result.
  *
  * @param api - API promise
  * @param account - KeyPair instance
  * @param tx - runtime transaction object to send
- * @param nonce - transaction nonce for a given account.
  * @returns extrinsic result promise.
  */
-function sendExtrinsic(
+async function sendExtrinsic(
   api: ApiPromise,
   account: KeyringPair,
-  tx: SubmittableExtrinsic<'promise'>,
-  nonce: Index
+  tx: SubmittableExtrinsic<'promise'>
 ): Promise<ISubmittableResult> {
+  const nonce = await lockAndGetNonce(api, account)
+
   return new Promise((resolve, reject) => {
     let unsubscribe: () => void
     tx.signAndSend(account, { nonce }, (result) => {
@@ -107,7 +138,9 @@ function sendExtrinsic(
         )
       }
     })
-      .then((unsubFunc) => (unsubscribe = unsubFunc))
+      .then((unsubFunc) => {
+        unsubscribe = unsubFunc
+      })
       .catch((e) =>
         reject(
           new ExtrinsicFailedError(`Cannot send the extrinsic: ${e.message ? e.message : stringify(e)}`, {
@@ -115,9 +148,28 @@ function sendExtrinsic(
           })
         )
       )
+      .finally(() => lock.release())
   })
 }
 
+/**
+ * Set the API lock and gets the last account nonce. It removes the lock on
+ * exception and rethrows the error.
+ *
+ * @param api runtime API promise
+ * @param account account to get the last nonce from.
+ * @returns
+ */
+async function lockAndGetNonce(api: ApiPromise, account: KeyringPair): Promise<Index> {
+  await lock.acquireAsync()
+  try {
+    return await api.rpc.system.accountNextIndex(account.address)
+  } catch (err) {
+    lock.release()
+    throw err
+  }
+}
+
 /**
  * Helper function for formatting dispatch error.
  *
@@ -155,25 +207,18 @@ export async function sendAndFollowNamedTx<T>(
   sudoCall = false,
   eventParser: ((result: ISubmittableResult) => T) | null = null
 ): Promise<T | void> {
-  try {
-    logger.debug(`Sending ${tx.method.section}.${tx.method.method} extrinsic...`)
-    if (sudoCall) {
-      tx = api.tx.sudo.sudo(tx)
-    }
-    const nonce = await getTransactionNonce(api, account)
-
-    const result = await sendExtrinsic(api, account, tx, nonce)
-    let eventResult: T | void
-    if (eventParser) {
-      eventResult = eventParser(result)
-    }
-    logger.debug(`Extrinsic successful!`)
-
-    return eventResult
-  } catch (err) {
-    await resetTransactionNonceCache()
-    throw err
+  logger.debug(`Sending ${tx.method.section}.${tx.method.method} extrinsic...`)
+  if (sudoCall) {
+    tx = api.tx.sudo.sudo(tx)
   }
+  const result = await sendExtrinsic(api, account, tx)
+  let eventResult: T | void
+  if (eventParser) {
+    eventResult = eventParser(result)
+  }
+  logger.debug(`Extrinsic successful!`)
+
+  return eventResult
 }
 
 /**

+ 23 - 6
storage-node-v2/src/services/sync/synchronizer.ts

@@ -20,6 +20,7 @@ export const TempDirName = 'temp'
  * @param api - (optional) runtime API promise
  * @param workerId - current storage provider ID
  * @param asyncWorkersNumber - maximum parallel downloads number
+ * @param asyncWorkersTimeout - downloading asset timeout
  * @param queryNodeUrl - Query Node endpoint URL
  * @param uploadDirectory - local directory to get file names from
  * @param operatorUrl - (optional) defines the data source URL. If not set
@@ -30,6 +31,7 @@ export async function performSync(
   api: ApiPromise | undefined,
   workerId: number,
   asyncWorkersNumber: number,
+  asyncWorkersTimeout: number,
   queryNodeUrl: string,
   uploadDirectory: string,
   tempDirectory: string,
@@ -60,10 +62,11 @@ export async function performSync(
       added,
       uploadDirectory,
       tempDirectory,
-      workingStack
+      workingStack,
+      asyncWorkersTimeout
     )
   } else {
-    addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory, tempDirectory)
+    addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory, tempDirectory, asyncWorkersTimeout)
   }
 
   logger.debug(`Sync - started processing...`)
@@ -85,6 +88,7 @@ export async function performSync(
  * @param uploadDirectory - local directory for data uploading
  * @param tempDirectory - local directory for temporary data uploading
  * @param taskSink - a destination for the newly created tasks
+ * @param asyncWorkersTimeout - downloading asset timeout
  */
 async function getPrepareDownloadTasks(
   api: ApiPromise | undefined,
@@ -93,7 +97,8 @@ async function getPrepareDownloadTasks(
   addedIds: string[],
   uploadDirectory: string,
   tempDirectory: string,
-  taskSink: TaskSink
+  taskSink: TaskSink,
+  asyncWorkersTimeout: number
 ): Promise<PrepareDownloadFileTask[]> {
   const bagIdByDataObjectId = new Map()
   for (const entry of dataObligations.dataObjects) {
@@ -134,7 +139,16 @@ async function getPrepareDownloadTasks(
       }
     }
 
-    return new PrepareDownloadFileTask(operatorUrls, bagId, id, uploadDirectory, tempDirectory, taskSink, api)
+    return new PrepareDownloadFileTask(
+      operatorUrls,
+      bagId,
+      id,
+      uploadDirectory,
+      tempDirectory,
+      taskSink,
+      asyncWorkersTimeout,
+      api
+    )
   })
 
   return tasks
@@ -147,15 +161,18 @@ async function getPrepareDownloadTasks(
  * @param addedIds - data object IDs to download
  * @param uploadDirectory - local directory for data uploading
  * @param tempDirectory - local directory for temporary data uploading
+ * @param downloadTimeout - asset downloading timeout (in minutes)
  */
 async function getDownloadTasks(
   operatorUrl: string,
   addedIds: string[],
   uploadDirectory: string,
-  tempDirectory: string
+  tempDirectory: string,
+  downloadTimeout: number
 ): Promise<DownloadFileTask[]> {
   const addedTasks = addedIds.map(
-    (fileName) => new DownloadFileTask(operatorUrl, fileName, undefined, uploadDirectory, tempDirectory)
+    (fileName) =>
+      new DownloadFileTask(operatorUrl, fileName, undefined, uploadDirectory, tempDirectory, downloadTimeout)
   )
 
   return addedTasks

+ 10 - 4
storage-node-v2/src/services/sync/tasks.ts

@@ -71,18 +71,20 @@ export class DownloadFileTask implements SyncTask {
   uploadsDirectory: string
   tempDirectory: string
   url: string
-
+  downloadTimeout: number
   constructor(
     baseUrl: string,
     dataObjectId: string,
     expectedHash: string | undefined,
     uploadsDirectory: string,
-    tempDirectory: string
+    tempDirectory: string,
+    downloadTimeout: number
   ) {
     this.dataObjectId = dataObjectId
     this.expectedHash = expectedHash
     this.uploadsDirectory = uploadsDirectory
     this.tempDirectory = tempDirectory
+    this.downloadTimeout = downloadTimeout
     this.url = urljoin(baseUrl, 'api/v1/files', dataObjectId)
   }
 
@@ -97,7 +99,7 @@ export class DownloadFileTask implements SyncTask {
     // This partial downloads will be cleaned up during the next sync iteration.
     const tempFilePath = path.join(this.uploadsDirectory, this.tempDirectory, uuidv4())
     try {
-      const timeoutMs = 30 * 60 * 1000 // 30 min for large files (~ 10 GB)
+      const timeoutMs = this.downloadTimeout * 60 * 1000
       // Casting because of:
       // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response
       const request = (superagent.get(this.url).timeout(timeoutMs) as unknown) as NodeJS.ReadableStream
@@ -149,6 +151,7 @@ export class PrepareDownloadFileTask implements SyncTask {
   uploadsDirectory: string
   tempDirectory: string
   api?: ApiPromise
+  downloadTimeout: number
 
   constructor(
     operatorUrlCandidates: string[],
@@ -157,6 +160,7 @@ export class PrepareDownloadFileTask implements SyncTask {
     uploadsDirectory: string,
     tempDirectory: string,
     taskSink: TaskSink,
+    downloadTimeout: number,
     api?: ApiPromise
   ) {
     this.api = api
@@ -166,6 +170,7 @@ export class PrepareDownloadFileTask implements SyncTask {
     this.operatorUrlCandidates = operatorUrlCandidates
     this.uploadsDirectory = uploadsDirectory
     this.tempDirectory = tempDirectory
+    this.downloadTimeout = downloadTimeout
   }
 
   description(): string {
@@ -209,7 +214,8 @@ export class PrepareDownloadFileTask implements SyncTask {
             this.dataObjectId,
             hash,
             this.uploadsDirectory,
-            this.tempDirectory
+            this.tempDirectory,
+            this.downloadTimeout
           )
 
           return this.taskSink.add([newTask])

+ 0 - 8
storage-node-v2/src/services/sync/workingProcess.ts

@@ -1,4 +1,3 @@
-import AwaitLock from 'await-lock'
 import sleep from 'sleep-promise'
 import { SyncTask } from './tasks'
 import logger from '../../services/logger'
@@ -32,17 +31,13 @@ export interface TaskSource {
  */
 export class WorkingStack implements TaskSink, TaskSource {
   workingStack: SyncTask[]
-  lock: AwaitLock
 
   constructor() {
     this.workingStack = []
-    this.lock = new AwaitLock()
   }
 
   async get(): Promise<SyncTask | null> {
-    await this.lock.acquireAsync()
     const task = this.workingStack.pop()
-    this.lock.release()
 
     if (task !== undefined) {
       return task
@@ -52,12 +47,9 @@ export class WorkingStack implements TaskSink, TaskSource {
   }
 
   async add(tasks: SyncTask[]): Promise<void> {
-    await this.lock.acquireAsync()
-
     if (tasks !== null) {
       this.workingStack.push(...tasks)
     }
-    this.lock.release()
   }
 }