Browse Source

Merge pull request #2881 from mnaamani/giza_staging-update-storagev2-sync5

Giza Staging: Update Storage Node v2 sync5
Mokhtar Naamani 3 years ago
parent
commit
8cbc6170ee

+ 0 - 1
storage-node-v2/.eslintrc.js

@@ -9,7 +9,6 @@ module.exports = {
   rules: {
     'no-console': 'warn', // use dedicated logger
     'no-unused-vars': 'off', // Required by the typescript rule below
-    'prettier/prettier': 'off', // prettier-eslint conflicts inherited from @joystream/eslint-config
     '@typescript-eslint/no-unused-vars': ['error'],
     '@typescript-eslint/no-floating-promises': 'error',
   },

+ 1 - 1
storage-node-v2/package.json

@@ -137,7 +137,7 @@
     "prepack": "rm -rf lib && tsc -b && oclif-dev manifest && oclif-dev readme",
     "version": "oclif-dev readme && git add README.md",
     "build": "tsc --build tsconfig.json",
-    "format": "prettier ./src --write",
+    "format": "yarn prettier ./src --write",
     "lint": "eslint ./src --ext .ts",
     "api:edit": "openapi-editor --file ./src/api-spec/openapi.yaml --port 10021",
     "generate:types:graphql": "yarn graphql-codegen -c ./src/services/queryNode/codegen.yml",

+ 4 - 3
storage-node-v2/src/commands/dev/sync.ts

@@ -27,17 +27,18 @@ export default class DevSync extends Command {
       char: 'p',
       required: false,
       description: 'Sync workers number (max async operations in progress).',
+      default: 20,
     }),
     queryNodeEndpoint: flags.string({
       char: 'q',
       required: false,
-      description: 'Query node host and port (e.g.: some.com:8081)',
       default: 'http://localhost:8081/graphql',
+      description: 'Query node endpoint (e.g.: http://some.com:8081/graphql)',
     }),
     dataSourceOperatorUrl: flags.string({
       char: 'o',
       required: false,
-      description: 'Storage node url base (e.g.: http://some.com:8081) to get data from.',
+      description: 'Storage node url base (e.g.: http://some.com:3333) to get data from.',
       default: 'http://localhost:3333',
     }),
     uploads: flags.string({
@@ -52,7 +53,7 @@ export default class DevSync extends Command {
 
     logger.info('Syncing...')
 
-    const syncWorkersNumber = flags.syncWorkersNumber ?? 20
+    const syncWorkersNumber = flags.syncWorkersNumber
 
     try {
       await performSync(

+ 1 - 1
storage-node-v2/src/commands/operator/set-metadata.ts

@@ -31,7 +31,7 @@ export default class OperatorSetMetadata extends ApiCommandBase {
     endpoint: flags.string({
       char: 'e',
       description: 'Root distribution node endpoint',
-      exclusive: ['input'],
+      exclusive: ['jsonFile'],
     }),
     jsonFile: flags.string({
       char: 'j',

+ 3 - 5
storage-node-v2/src/commands/server.ts

@@ -13,7 +13,6 @@ import { promisify } from 'util'
 import { KeyringPair } from '@polkadot/keyring/types'
 import ExitCodes from './../command-base/ExitCodes'
 import { CLIError } from '@oclif/errors'
-import { Worker } from '@joystream/types/working-group'
 import fs from 'fs'
 const fsPromises = fs.promises
 
@@ -179,10 +178,10 @@ async function runSyncWithInterval(
   syncWorkersNumber: number,
   syncIntervalMinutes: number
 ) {
-  const sleepIntevalInSeconds = syncIntervalMinutes * 60 * 1000
+  const sleepInteval = syncIntervalMinutes * 60 * 1000
   while (true) {
     logger.info(`Sync paused for ${syncIntervalMinutes} minute(s).`)
-    await sleep(sleepIntevalInSeconds)
+    await sleep(sleepInteval)
     try {
       logger.info(`Resume syncing....`)
       await performSync(api, workerId, syncWorkersNumber, queryNodeUrl, uploadsDirectory, tempDirectory)
@@ -226,8 +225,7 @@ async function recreateTempDirectory(uploadsDirectory: string, tempDirName: stri
  */
 async function verifyWorkerId(api: ApiPromise, workerId: number, account: KeyringPair): Promise<void> {
   // Cast Codec type to Worker type
-  const workerObj = await api.query.storageWorkingGroup.workerById(workerId)
-  const worker = workerObj as Worker
+  const worker = await api.query.storageWorkingGroup.workerById(workerId)
 
   if (worker.role_account_id.toString() !== account.address) {
     throw new CLIError(`Provided worker ID doesn't match the Joystream account.`, {

+ 3 - 2
storage-node-v2/src/services/queryNode/api.ts

@@ -19,6 +19,7 @@ import {
   GetStorageBucketsConnection,
   GetStorageBucketsConnectionQuery,
   GetStorageBucketsConnectionQueryVariables,
+  GetStorageBucketDetailsByWorkerId,
 } from './generated/queries'
 import { Maybe, StorageBagWhereInput } from './generated/schema'
 
@@ -164,12 +165,12 @@ export class QueryNodeApi {
    *
    * @param workerId - worker ID
    */
-  public async getStorageBucketDetailsByWorkerId(workerId: string): Promise<Array<StorageBucketIdsFragment>> {
+  public async getStorageBucketIdsByWorkerId(workerId: string): Promise<Array<StorageBucketIdsFragment>> {
     const result = await this.multipleEntitiesWithPagination<
       StorageBucketIdsFragment,
       GetStorageBucketDetailsByWorkerIdQuery,
       GetStorageBucketDetailsByWorkerIdQueryVariables
-    >(GetStorageBucketsConnection, { workerId, limit: MAX_RESULTS_PER_QUERY }, 'storageBucketsConnection')
+    >(GetStorageBucketDetailsByWorkerId, { workerId, limit: MAX_RESULTS_PER_QUERY }, 'storageBucketsConnection')
 
     if (!result) {
       return []

+ 5 - 1
storage-node-v2/src/services/queryNode/generated/queries.ts

@@ -122,7 +122,11 @@ export const DataObjectDetails = gql`
 `
 export const GetStorageBucketsConnection = gql`
   query getStorageBucketsConnection($limit: Int, $cursor: String) {
-    storageBucketsConnection(first: $limit, after: $cursor) {
+    storageBucketsConnection(
+      first: $limit
+      after: $cursor
+      where: { operatorStatus_json: { isTypeOf_eq: "StorageBucketOperatorStatusActive" } }
+    ) {
       edges {
         cursor
         node {

+ 5 - 1
storage-node-v2/src/services/queryNode/queries/queries.graphql

@@ -4,7 +4,11 @@ fragment StorageBucketIds on StorageBucket {
 }
 
 query getStorageBucketsConnection($limit: Int, $cursor: String) {
-  storageBucketsConnection(first: $limit, after: $cursor) {
+  storageBucketsConnection(
+    first: $limit
+    after: $cursor
+    where: { operatorStatus_json: { isTypeOf_eq: "StorageBucketOperatorStatusActive" } }
+  ) {
     edges {
       cursor
       node {

+ 1 - 1
storage-node-v2/src/services/sync/remoteStorageData.ts

@@ -4,7 +4,7 @@ import logger from '../logger'
 import NodeCache from 'node-cache'
 
 // Expiration period in seconds for the local cache.
-const ExpirationPeriod: number = 5 * 60 // minutes
+const ExpirationPeriod: number = 3 * 60 // minutes
 
 // Max data entries in local cache
 const MaxEntries = 10000

+ 2 - 4
storage-node-v2/src/services/sync/storageObligations.ts

@@ -129,7 +129,7 @@ export async function getStorageObligationsFromRuntime(
 export async function getStorageBucketIdsByWorkerId(queryNodeUrl: string, workerId: number): Promise<string[]> {
   const api = new QueryNodeApi(queryNodeUrl)
 
-  const idFragments = await api.getStorageBucketDetailsByWorkerId(workerId.toString())
+  const idFragments = await api.getStorageBucketIdsByWorkerId(workerId.toString())
   const ids = idFragments.map((frag) => frag.id)
 
   return ids
@@ -161,7 +161,7 @@ async function getAllBuckets(api: QueryNodeApi): Promise<StorageBucketDetailsFra
 
   return await getAllObjectsWithPaging(
     'get all storage buckets',
-    async (offset, limit) => await api.getStorageBucketDetails(ids, offset, limit)
+    async (offset, limit) => await api.getStorageBucketDetails(ids.slice(offset, offset + limit), 0, limit)
   )
 }
 
@@ -209,8 +209,6 @@ async function getAllObjectsWithPaging<T>(
     resultPart = await query(offset, limit)
     offset += limit
     result.push(...resultPart)
-
-    if (resultPart.length < limit) break
   } while (resultPart.length > 0)
 
   return result

+ 3 - 5
storage-node-v2/src/services/sync/synchronizer.ts

@@ -1,7 +1,7 @@
 import { getStorageObligationsFromRuntime, DataObligations } from './storageObligations'
 import logger from '../../services/logger'
 import { getDataObjectIDs } from '../../services/caching/localDataObjects'
-import { SyncTask, DownloadFileTask, DeleteLocalFileTask, PrepareDownloadFileTask } from './tasks'
+import { SyncTask, DownloadFileTask, PrepareDownloadFileTask } from './tasks'
 import { WorkingStack, TaskProcessorSpawner, TaskSink } from './workingProcess'
 import _ from 'lodash'
 import { ApiPromise } from '@polkadot/api'
@@ -46,11 +46,10 @@ export async function performSync(
   const added = _.difference(requiredIds, files)
   const deleted = _.difference(files, requiredIds)
 
-  logger.debug(`Sync - added objects: ${added.length}`)
-  logger.debug(`Sync - deleted objects: ${deleted.length}`)
+  logger.debug(`Sync - new objects: ${added.length}`)
+  logger.debug(`Sync - obsolete objects: ${deleted.length}`)
 
   const workingStack = new WorkingStack()
-  const deletedTasks = deleted.map((fileName) => new DeleteLocalFileTask(uploadDirectory, fileName))
 
   let addedTasks: SyncTask[]
   if (operatorUrl === undefined) {
@@ -72,7 +71,6 @@ export async function performSync(
   const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber)
 
   await workingStack.add(addedTasks)
-  await workingStack.add(deletedTasks)
 
   await processSpawner.process()
   logger.info('Sync ended.')

+ 1 - 1
storage-node-v2/src/services/sync/tasks.ts

@@ -100,7 +100,7 @@ export class DownloadFileTask implements SyncTask {
       const timeoutMs = 30 * 60 * 1000 // 30 min for large files (~ 10 GB)
       // Casting because of:
       // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response
-      const request = superagent.get(this.url).timeout(timeoutMs) as unknown as NodeJS.ReadableStream
+      const request = (superagent.get(this.url).timeout(timeoutMs) as unknown) as NodeJS.ReadableStream
       const fileStream = fs.createWriteStream(tempFilePath)
 
       request.on('response', (res) => {

+ 1 - 79
storage-node-v2/src/services/webApi/controllers/common.ts

@@ -28,84 +28,6 @@ export class ServerError extends WebApiError {
   }
 }
 
-/**
- * Returns a directory for file uploading from the response.
- *
- * @remarks
- * This is a helper function. It parses the response object for a variable and
- * throws an error on failure.
- */
-export function getUploadsDir(res: express.Response<unknown, AppConfig>): string {
-  if (res.locals.uploadsDir) {
-    return res.locals.uploadsDir
-  }
-
-  throw new ServerError('No upload directory path loaded.')
-}
-
-/**
- * Returns a directory for temporary file uploading from the response.
- *
- * @remarks
- * This is a helper function. It parses the response object for a variable and
- * throws an error on failure.
- */
-export function getTempFileUploadingDir(res: express.Response<unknown, AppConfig>): string {
-  if (res.locals.tempFileUploadingDir) {
-    return res.locals.tempFileUploadingDir
-  }
-
-  throw new ServerError('No temporary uploading directory path loaded.')
-}
-
-/**
- * Returns worker ID from the response.
- *
- * @remarks
- * This is a helper function. It parses the response object for a variable and
- * throws an error on failure.
- */
-export function getWorkerId(res: express.Response<unknown, AppConfig>): number {
-  if (res.locals.workerId || res.locals.workerId === 0) {
-    return res.locals.workerId
-  }
-
-  throw new ServerError('No Joystream worker ID loaded.')
-}
-
-/**
- * Returns the QueryNode URL from the starting parameters.
- *
- * @remarks
- * This is a helper function. It parses the response object for a variable and
- * throws an error on failure.
- */
-export function getQueryNodeUrl(res: express.Response<unknown, AppConfig>): string {
-  if (res.locals.queryNodeEndpoint) {
-    return res.locals.queryNodeEndpoint
-  }
-
-  throw new ServerError('No Query Node URL loaded.')
-}
-
-/**
- * Returns a command config.
- *
- * @remarks
- * This is a helper function. It parses the response object for a variable and
- * throws an error on failure.
- */
-export function getCommandConfig(res: express.Response<unknown, AppConfig>): {
-  version: string
-  userAgent: string
-} {
-  if (res.locals.process) {
-    return res.locals.process
-  }
-
-  throw new ServerError('Cannot load command config.')
-}
-
 /**
  * Handles errors and sends a response.
  *
@@ -146,7 +68,7 @@ export function getHttpStatusCodeByError(err: Error): number {
   }
 
   if (err instanceof ExtrinsicFailedError) {
-    return 400
+    return 500
   }
 
   if (err instanceof WebApiError) {

+ 18 - 54
storage-node-v2/src/services/webApi/controllers/filesApi.ts

@@ -13,7 +13,6 @@ import { createNonce, getTokenExpirationTime } from '../../caching/tokenNonceKee
 import { getFileInfo } from '../../helpers/fileInfo'
 import { BagId } from '@joystream/types/storage'
 import logger from '../../logger'
-import { KeyringPair } from '@polkadot/keyring/types'
 import { ApiPromise } from '@polkadot/api'
 import * as express from 'express'
 import fs from 'fs'
@@ -22,17 +21,7 @@ import send from 'send'
 import { hexToString } from '@polkadot/util'
 import { parseBagId } from '../../helpers/bagTypes'
 import { timeout } from 'promise-timeout'
-import {
-  getUploadsDir,
-  getWorkerId,
-  getQueryNodeUrl,
-  WebApiError,
-  ServerError,
-  getCommandConfig,
-  sendResponseWithError,
-  getHttpStatusCodeByError,
-  AppConfig,
-} from './common'
+import { WebApiError, sendResponseWithError, getHttpStatusCodeByError, AppConfig } from './common'
 import { getStorageBucketIdsByWorkerId } from '../../sync/storageObligations'
 import { Membership } from '@joystream/types/members'
 const fsPromises = fs.promises
@@ -43,7 +32,7 @@ const fsPromises = fs.promises
 export async function getFile(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const dataObjectId = getDataObjectId(req)
-    const uploadsDir = getUploadsDir(res)
+    const uploadsDir = res.locals.uploadsDir
     const fullPath = path.resolve(uploadsDir, dataObjectId)
 
     const fileInfo = await getFileInfo(fullPath)
@@ -74,7 +63,7 @@ export async function getFile(req: express.Request, res: express.Response<unknow
 export async function getFileHeaders(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const dataObjectId = getDataObjectId(req)
-    const uploadsDir = getUploadsDir(res)
+    const uploadsDir = res.locals.uploadsDir
     const fullPath = path.resolve(uploadsDir, dataObjectId)
     const fileInfo = await getFileInfo(fullPath)
     const fileStats = await fsPromises.stat(fullPath)
@@ -100,21 +89,21 @@ export async function uploadFile(req: express.Request, res: express.Response<unk
   try {
     const fileObj = getFileObject(req)
     cleanupFileName = fileObj.path
-    const queryNodeUrl = getQueryNodeUrl(res)
-    const workerId = getWorkerId(res)
+    const queryNodeUrl = res.locals.queryNodeEndpoint
+    const workerId = res.locals.workerId
 
     const [, hash] = await Promise.all([
       verifyBucketId(queryNodeUrl, workerId, uploadRequest.storageBucketId),
       hashFile(fileObj.path),
     ])
 
-    const api = getApi(res)
+    const api = res.locals.api
     const bagId = parseBagId(uploadRequest.bagId)
     const accepted = await verifyDataObjectInfo(api, bagId, uploadRequest.dataObjectId, fileObj.size, hash)
 
     // Prepare new file name
     const dataObjectId = uploadRequest.dataObjectId.toString()
-    const uploadsDir = getUploadsDir(res)
+    const uploadsDir = res.locals.uploadsDir
     const newPath = path.join(uploadsDir, dataObjectId)
 
     registerNewDataObjectId(dataObjectId)
@@ -125,9 +114,14 @@ export async function uploadFile(req: express.Request, res: express.Response<unk
     cleanupFileName = newPath
 
     if (!accepted) {
-      await acceptPendingDataObjects(api, bagId, getAccount(res), workerId, uploadRequest.storageBucketId, [
-        uploadRequest.dataObjectId,
-      ])
+      await acceptPendingDataObjects(
+        api,
+        bagId,
+        res.locals.storageProviderAccount,
+        workerId,
+        uploadRequest.storageBucketId,
+        [uploadRequest.dataObjectId]
+      )
     } else {
       logger.warn(
         `Received already accepted data object. DataObjectId = ${uploadRequest.dataObjectId} WorkerId = ${workerId}`
@@ -152,9 +146,9 @@ export async function authTokenForUploading(
   res: express.Response<unknown, AppConfig>
 ): Promise<void> {
   try {
-    const account = getAccount(res)
+    const account = res.locals.storageProviderAccount
     const tokenRequest = getTokenRequest(req)
-    const api = getApi(res)
+    const api = res.locals.api
 
     await validateTokenRequest(api, tokenRequest)
 
@@ -193,36 +187,6 @@ function getFileObject(req: express.Request): Express.Multer.File {
   throw new WebApiError('No file uploaded', 400)
 }
 
-/**
- * Returns a KeyPair instance from the response.
- *
- * @remarks
- * This is a helper function. It parses the response object for a variable and
- * throws an error on failure.
- */
-function getAccount(res: express.Response<unknown, AppConfig>): KeyringPair {
-  if (res.locals.storageProviderAccount) {
-    return res.locals.storageProviderAccount
-  }
-
-  throw new ServerError('No Joystream account loaded.')
-}
-
-/**
- * Returns API promise from the response.
- *
- * @remarks
- * This is a helper function. It parses the response object for a variable and
- * throws an error on failure.
- */
-function getApi(res: express.Response<unknown, AppConfig>): ApiPromise {
-  if (res.locals.api) {
-    return res.locals.api
-  }
-
-  throw new ServerError('No Joystream API loaded.')
-}
-
 /**
  * Returns data object ID from the request.
  *
@@ -339,7 +303,7 @@ async function cleanupFileOnError(cleanupFileName: string, error: string): Promi
  */
 export async function getVersion(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
-    const config = getCommandConfig(res)
+    const config = res.locals.process
 
     // Copy from an object, because the actual object could contain more data.
     res.status(200).json({

+ 5 - 13
storage-node-v2/src/services/webApi/controllers/stateApi.ts

@@ -2,15 +2,7 @@ import { getDataObjectIDs } from '../../../services/caching/localDataObjects'
 import * as express from 'express'
 import _ from 'lodash'
 import { getDataObjectIDsByBagId } from '../../sync/storageObligations'
-import {
-  getUploadsDir,
-  getTempFileUploadingDir,
-  getQueryNodeUrl,
-  WebApiError,
-  getCommandConfig,
-  sendResponseWithError,
-  AppConfig,
-} from './common'
+import { WebApiError, sendResponseWithError, AppConfig } from './common'
 import fastFolderSize from 'fast-folder-size'
 import { promisify } from 'util'
 import fs from 'fs'
@@ -52,8 +44,8 @@ export async function getLocalDataStats(
   res: express.Response<unknown, AppConfig>
 ): Promise<void> {
   try {
-    const uploadsDir = getUploadsDir(res)
-    const tempFileDir = getTempFileUploadingDir(res)
+    const uploadsDir = res.locals.uploadsDir
+    const tempFileDir = res.locals.tempFileUploadingDir
     const fastFolderSizeAsync = promisify(fastFolderSize)
 
     const tempFolderExists = fs.existsSync(tempFileDir)
@@ -98,7 +90,7 @@ export async function getLocalDataObjectsByBagId(
   res: express.Response<unknown, AppConfig>
 ): Promise<void> {
   try {
-    const queryNodeUrl = getQueryNodeUrl(res)
+    const queryNodeUrl = res.locals.queryNodeEndpoint
     const bagId = getBagId(req)
 
     const [ids, requiredIds] = await Promise.all([
@@ -119,7 +111,7 @@ export async function getLocalDataObjectsByBagId(
  */
 export async function getVersion(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
-    const config = getCommandConfig(res)
+    const config = res.locals.process
 
     // Copy from an object, because the actual object could contain more data.
     res.status(200).json({