Просмотр исходного кода

Merge remote-tracking branch 'shamil/storage_node_v2_giza_staging_patch' into deploy-giza-playground-new-query-node

Mokhtar Naamani 3 лет назад
Родитель
Сommit
bcd40bceea
33 измененных файлов с 1563 добавлено и 1294 удалено
  1. 4 4
      colossus.Dockerfile
  2. 36 29
      storage-node-v2/README.md
  3. 5 2
      storage-node-v2/package.json
  4. 0 200
      storage-node-v2/scripts/generate-test-data.ts
  5. 4 3
      storage-node-v2/scripts/init-dev-bucket.sh
  6. 3 2
      storage-node-v2/scripts/run-all-commands.sh
  7. 4 4
      storage-node-v2/src/api-spec/openapi.yaml
  8. 1 0
      storage-node-v2/src/command-base/ExitCodes.ts
  9. 14 8
      storage-node-v2/src/commands/dev/sync.ts
  10. 59 38
      storage-node-v2/src/commands/server.ts
  11. 84 0
      storage-node-v2/src/services/caching/localDataObjects.ts
  12. 35 0
      storage-node-v2/src/services/caching/newUploads.ts
  13. 0 0
      storage-node-v2/src/services/caching/tokenNonceKeeper.ts
  14. 0 0
      storage-node-v2/src/services/caching/transactionNonceKeeper.ts
  15. 4 3
      storage-node-v2/src/services/helpers/auth.ts
  16. 9 21
      storage-node-v2/src/services/helpers/bagTypes.ts
  17. 11 22
      storage-node-v2/src/services/logger.ts
  18. 2 1
      storage-node-v2/src/services/metadata/validationService.ts
  19. 126 34
      storage-node-v2/src/services/queryNode/api.ts
  20. 138 21
      storage-node-v2/src/services/queryNode/generated/queries.ts
  21. 332 478
      storage-node-v2/src/services/queryNode/generated/schema.ts
  22. 76 11
      storage-node-v2/src/services/queryNode/queries/queries.graphql
  23. 5 4
      storage-node-v2/src/services/runtime/api.ts
  24. 1 1
      storage-node-v2/src/services/runtime/extrinsics.ts
  25. 13 18
      storage-node-v2/src/services/sync/storageObligations.ts
  26. 44 45
      storage-node-v2/src/services/sync/synchronizer.ts
  27. 102 21
      storage-node-v2/src/services/sync/tasks.ts
  28. 4 2
      storage-node-v2/src/services/sync/workingProcess.ts
  29. 6 69
      storage-node-v2/src/services/webApi/app.ts
  30. 70 13
      storage-node-v2/src/services/webApi/controllers/common.ts
  31. 24 14
      storage-node-v2/src/services/webApi/controllers/filesApi.ts
  32. 17 29
      storage-node-v2/src/services/webApi/controllers/stateApi.ts
  33. 330 197
      yarn.lock

+ 4 - 4
colossus.Dockerfile

@@ -3,7 +3,7 @@ FROM --platform=linux/x86-64 node:14 as builder
 WORKDIR /joystream
 COPY . /joystream
 
-RUN yarn
+RUN yarn --frozen-lockfile
 
 RUN yarn workspace @joystream/types build
 RUN yarn workspace @joystream/metadata-protobuf build
@@ -15,14 +15,14 @@ VOLUME ["/data", "/keystore"]
 # Required variables
 ENV WS_PROVIDER_ENDPOINT_URI=ws://not-set
 ENV COLOSSUS_PORT=3333
-ENV QUERY_NODE_HOST=not-set
-ENV WORKER_ID=
+ENV QUERY_NODE_ENDPOINT=http://not-set/graphql
+ENV WORKER_ID=not-set
 # - set external key file using the `/keystore` volume
 ENV ACCOUNT_KEYFILE=
 ENV ACCOUNT_PWD=
 # Optional variables
 ENV SYNC_INTERVAL=1
-ENV ELASTIC_SEARCH_HOST=
+ENV ELASTIC_SEARCH_ENDPOINT=
 # warn, error, debug, info
 ENV ELASTIC_LOG_LEVEL=debug
 # - overrides account key file

+ 36 - 29
storage-node-v2/README.md

@@ -53,7 +53,7 @@ API endpoints:
 
 ### Auth schema description
 
-To reduce the possibility of abuse of the uploading endpoint we implemented a simple authentication schema. On each uploading attempt, the client should receive the auth token first and provide it as a special header. The token has an expiration time and cannot be reused. To receive such token the client should be part of the StorageWorkingGroup and have  `WorkerId`.
+To reduce the possibility of abuse of the uploading endpoint we implemented a simple authentication schema. On each uploading attempt, the client should receive the auth token first and provide it as a special header. The token has an expiration time and cannot be reused. To receive such a token the client should have Joystream Membership and `MemberId`.
 
 
 ### CLI
@@ -142,7 +142,7 @@ $ yarn storage-node server --apiUrl ws://localhost:9944  -w 0 --accountUri //Ali
 ### Prerequisites
 - accountURI or keyfile and password
 - workerId from the Storage working group that matches with the account above
-- Joystream Network validator URL
+- Joystream node websocket endpoint URL
 - QueryNode URL
 - (optional) ElasticSearch URL
 - created directory for data uploading
@@ -223,17 +223,18 @@ USAGE
   $ storage-node dev:sync
 
 OPTIONS
-  -d, --uploads=uploads                                (required) Data uploading directory (absolute path).
-  -h, --help                                           show CLI help
+  -d, --uploads=uploads                              (required) Data uploading directory (absolute path).
+  -h, --help                                         show CLI help
 
-  -o, --dataSourceOperatorHost=dataSourceOperatorHost  Storage node host and port (e.g.: some.com:8081) to get data
-                                                       from.
+  -o, --dataSourceOperatorUrl=dataSourceOperatorUrl  [default: http://localhost:3333] Storage node url base (e.g.:
+                                                     http://some.com:8081) to get data from.
 
-  -p, --syncWorkersNumber=syncWorkersNumber            Sync workers number (max async operations in progress).
+  -p, --syncWorkersNumber=syncWorkersNumber          Sync workers number (max async operations in progress).
 
-  -q, --queryNodeHost=queryNodeHost                    Query node host and port (e.g.: some.com:8081)
+  -q, --queryNodeEndpoint=queryNodeEndpoint          [default: http://localhost:8081/graphql] Query node host and port
+                                                     (e.g.: some.com:8081)
 
-  -w, --workerId=workerId                              (required) Storage node operator worker ID.
+  -w, --workerId=workerId                            (required) Storage node operator worker ID.
 ```
 
 _See code: [src/commands/dev/sync.ts](https://github.com/Joystream/joystream/blob/v2.0.0/src/commands/dev/sync.ts)_
@@ -711,39 +712,45 @@ USAGE
   $ storage-node server
 
 OPTIONS
-  -a, --disableUploadAuth                    Disable uploading authentication (should be used in testing-context only).
-  -d, --uploads=uploads                      (required) Data uploading directory (absolute path).
+  -a, --disableUploadAuth                            Disable uploading authentication (should be used in testing-context
+                                                     only).
 
-  -e, --elasticSearchHost=elasticSearchHost  Elasticsearch host and port (e.g.: some.com:8081).
-                                             Log level could be set using the ELASTIC_LOG_LEVEL enviroment variable.
-                                             Supported values: warn, error, debug, info. Default:debug
+  -d, --uploads=uploads                              (required) Data uploading directory (absolute path).
 
-  -h, --help                                 show CLI help
+  -e, --elasticSearchEndpoint=elasticSearchEndpoint  Elasticsearch endpoint (e.g.: http://some.com:8081).
+                                                     Log level could be set using the ELASTIC_LOG_LEVEL enviroment
+                                                     variable.
+                                                     Supported values: warn, error, debug, info. Default:debug
 
-  -i, --syncInterval=syncInterval            [default: 1] Interval between synchronizations (in minutes)
+  -h, --help                                         show CLI help
 
-  -k, --keyFile=keyFile                      Key file for the account. Mandatory in non-dev environment.
+  -i, --syncInterval=syncInterval                    [default: 1] Interval between synchronizations (in minutes)
 
-  -m, --dev                                  Use development mode
+  -k, --keyFile=keyFile                              Key file for the account. Mandatory in non-dev environment.
 
-  -o, --port=port                            (required) Server port.
+  -m, --dev                                          Use development mode
 
-  -p, --password=password                    Key file password (optional). Could be overriden by ACCOUNT_PWD environment
-                                             variable.
+  -o, --port=port                                    (required) Server port.
 
-  -q, --queryNodeHost=queryNodeHost          (required) Query node host and port (e.g.: some.com:8081)
+  -p, --password=password                            Key file password (optional). Could be overriden by ACCOUNT_PWD
+                                                     environment variable.
 
-  -r, --syncWorkersNumber=syncWorkersNumber  [default: 20] Sync workers number (max async operations in progress).
+  -q, --queryNodeEndpoint=queryNodeEndpoint          (required) [default: http://localhost:8081/graphql] Query node
+                                                     endpoint (e.g.: http://some.com:8081/graphql)
 
-  -s, --sync                                 Enable data synchronization.
+  -r, --syncWorkersNumber=syncWorkersNumber          [default: 20] Sync workers number (max async operations in
+                                                     progress).
 
-  -u, --apiUrl=apiUrl                        [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev
-                                             environment.
+  -s, --sync                                         Enable data synchronization.
 
-  -w, --worker=worker                        (required) Storage provider worker ID
+  -u, --apiUrl=apiUrl                                [default: ws://localhost:9944] Runtime API URL. Mandatory in
+                                                     non-dev environment.
 
-  -y, --accountUri=accountUri                Account URI (optional). Has a priority over the keyFile and password flags.
-                                             Could be overriden by ACCOUNT_URI environment variable.
+  -w, --worker=worker                                (required) Storage provider worker ID
+
+  -y, --accountUri=accountUri                        Account URI (optional). Has a priority over the keyFile and
+                                                     password flags. Could be overriden by ACCOUNT_URI environment
+                                                     variable.
 ```
 
 _See code: [src/commands/server.ts](https://github.com/Joystream/joystream/blob/v2.0.0/src/commands/server.ts)_

+ 5 - 2
storage-node-v2/package.json

@@ -9,8 +9,9 @@
   "bugs": "https://github.com/Joystream/joystream/issues",
   "dependencies": {
     "@apollo/client": "^3.3.21",
-    "@joystream/types": "^0.17.0",
+    "@elastic/ecs-winston-format": "^1.3.1",
     "@joystream/metadata-protobuf": "^1.0.0",
+    "@joystream/types": "^0.17.0",
     "@oclif/command": "^1",
     "@oclif/config": "^1",
     "@oclif/plugin-help": "^3",
@@ -38,6 +39,7 @@
     "express-openapi-validator": "4.12.4",
     "express-winston": "^4.1.0",
     "fast-folder-size": "^1.4.0",
+    "fast-safe-stringify": "^2.1.1",
     "file-type": "^16.5.0",
     "lodash": "^4.17.21",
     "multihashes": "^4.0.2",
@@ -140,7 +142,8 @@
     "api:edit": "openapi-editor --file ./src/api-spec/openapi.yaml --port 10021",
     "generate:types:graphql": "yarn graphql-codegen -c ./src/services/queryNode/codegen.yml",
     "generate:types:json-schema": "yarn ts-node ./src/services/metadata/generateTypes.ts",
-    "ensure": "yarn format && yarn lint --fix && yarn build"
+    "ensure": "yarn format && yarn lint --fix && yarn build",
+    "checks": "tsc --noEmit --pretty && prettier ./src --check && yarn lint"
   },
   "types": "lib/index.d.ts"
 }

+ 0 - 200
storage-node-v2/scripts/generate-test-data.ts

@@ -1,200 +0,0 @@
-#!/usr/bin/env ts-node
-
-import fs from 'fs'
-import path from 'path'
-const fsPromises = fs.promises
-import { Client, ClientConfig,QueryResult } from 'pg'
-import { exit } from 'process'
-
-async function doJob(): Promise<void> {
-  const uploadDirectory = '/Users/shamix/uploads5'
-  const fileSize = 1000
-
-  const objectNumber = 10000
-  const bagNumber = 10
-  const bucketNumber = 10
-
-  const urls = [
-    `http://localhost:3333/`,
-    `http://localhost:3334/`,
-    `http://localhost:3335/`,
-  ]
-
-  const updateDb = false
-  const generateFiles = true
-
-  if (updateDb) {
-    const config : ClientConfig = {
-      user: 'postgres',
-      password: 'postgres',
-      database: 'query_node_processor',
-      host: 'localhost'
-    }
-    const client = new Client(config)
-    await client.connect()
-
-    // Cleanup
-    await client.query('TRUNCATE storage_data_object')
-    await client.query('TRUNCATE storage_bucket CASCADE')
-    await client.query('TRUNCATE storage_bag CASCADE')
-    await client.query('TRUNCATE storage_bag_storage_bucket')
-  
-    // Generate objects
-    await createBags(client, bagNumber)
-    await createBuckets(client, bucketNumber)
-    await createBagBucketLinks(client)
-    await createBucketWorkerLinks(client)
-    await createBucketOperatorUrls(client, urls)
-    const dbTasks = createDataObjects(client, objectNumber)
-    await Promise.all(dbTasks)
-
-    await client.end()
-  }
-  
-  if (generateFiles) {
-    await createFiles(uploadDirectory, fileSize, objectNumber)
-  }
-}
-
-function createDataObjects(client: Client, objectNumber: number): Promise<QueryResult<any>>[] {
-  const tasks: any[] = []
-
-  const bagId = '1'
-  for(let i: number = 1; i <= objectNumber; i++){
-    const name = i.toString()
-
-    console.log(`Writing ${i} data object...`)
-
-    const dbTask = client.query(
-      `INSERT INTO storage_data_object(storage_bag_id, ipfs_hash, id, created_by_id, version, is_accepted, size) 
-       values(${bagId}, ${name}, ${name}, 'some', '1', true, 100)`
-    )
-
-    tasks.push(dbTask)
-  }
-
-  return tasks
-}
-
-async function createFiles(uploadDirectory: string, fileSize: number, objectNumber: number): Promise<void> {
-  const data = new Uint8Array(fileSize)
-  let tasks: any[] = []
-  for(let i: number = 1; i <= objectNumber; i++){
-    const name = i.toString()
-
-    console.log(`Writing ${i} file...`)
-
-    const fileTask = fsPromises.writeFile(
-      path.join(uploadDirectory, name), 
-      data
-    )
-
-    tasks.push(fileTask)
-
-    if (i % 100 === 0){
-      await Promise.all(tasks)
-      tasks.length = 0
-    }
-  }
-
-  if (tasks.length > 0) {
-    await Promise.all(tasks)
-  }
-}
-
-async function createBags(client: Client, bagNumber: number): Promise<void> {
-  for(let i: number = 1; i <= bagNumber; i++){
-    const name = i.toString()
-
-    console.log(`Writing ${i} bag...`)
-
-    await client.query(
-      `INSERT INTO storage_bag(id, created_by_id, version, owner) 
-       values(${name}, 'some', '1',  '{}')`
-    )
-  }
-}
-
-async function createBuckets(client: Client, bucketNumber: number): Promise<void> {
-  const missingWorkerId = `{"isTypeOf": "StorageBucketOperatorStatusMissing"}`
-  for(let i: number = 1; i <= bucketNumber; i++){
-    const name = i.toString()
-
-    console.log(`Writing ${i} bucket...`)
-
-    await client.query(
-      `INSERT INTO storage_bucket(id, created_by_id, version, operator_status, accepting_new_bags, data_objects_size_limit,data_object_count_limit) 
-       values(${name}, 'some', '1',  '${missingWorkerId}', true, 100000000, 100000000)`
-    )
-  }
-}
-
-
-async function createBagBucketLinks(client: Client): Promise<void> {
-    console.log(`Writing bag to bucket links...`)
-
-    // Bucket1 to Bag1
-    await client.query(
-      `INSERT INTO storage_bag_storage_bucket(storage_bag_id, storage_bucket_id) 
-       values('1', '1')`
-    )
-    // Bucket2 to Bag1
-    await client.query(
-      `INSERT INTO storage_bag_storage_bucket(storage_bag_id, storage_bucket_id) 
-       values('1', '2')`
-    )    
-    // Bucket3 to Bag1
-    await client.query(
-      `INSERT INTO storage_bag_storage_bucket(storage_bag_id, storage_bucket_id) 
-       values('1', '3')`
-    )
-}
-
-async function createBucketWorkerLinks(client: Client): Promise<void> {
-    console.log(`Writing bucket worker links...`)
-
-    const assignedWorker0 = `{"isTypeOf": "StorageBucketOperatorStatusActive", "workerId": 0}`
-    const assignedWorker1 = `{"isTypeOf": "StorageBucketOperatorStatusActive", "workerId": 1}`
-    const assignedWorker2 = `{"isTypeOf": "StorageBucketOperatorStatusActive", "workerId": 2}`
-
-    // Bucket1 to Worker0
-    await client.query(
-      `UPDATE storage_bucket
-       SET operator_status = '${assignedWorker0}'
-       WHERE id = '1'`
-    )
-    // Bucket2 to Worker1
-    await client.query(
-      `UPDATE storage_bucket
-       SET operator_status = '${assignedWorker1}'
-       WHERE id = '2'`
-    )   
-     // Bucket3 to Worker2
-    await client.query(
-      `UPDATE storage_bucket
-       SET operator_status = '${assignedWorker2}'
-       WHERE id = '3'`
-    )
-}
-
-async function createBucketOperatorUrls(client: Client, urls: string[]): Promise<void> {
-    console.log(`Writing bucket operator URLs...`)
-
-    for (let i = 0; i < urls.length; i++) {
-      const bucketId = i + 1
-      const metadata = urls[i]
-
-      await client.query(
-        `UPDATE storage_bucket
-         SET operator_metadata = '${metadata}'
-         WHERE id = '${bucketId}'`
-      )
-    }
-}
-
-doJob().then(() => {
-  console.log('Done')
-}).catch((err) => {
-  console.log(err)
-  exit(1)
-})

+ 4 - 3
storage-node-v2/scripts/init-dev-bucket.sh

@@ -11,7 +11,8 @@ CLI=../bin/run
 
 ${CLI} dev:init
 ${CLI} leader:update-bag-limit -l 7 --dev
-${CLI} leader:update-voucher-limits -o 100 -s 10000000 --dev
-BUCKET_ID=`${CLI} leader:create-bucket -i=0 -a -n=100 -s=10000000  --dev` 
+${CLI} leader:update-voucher-limits -o 10000 -s 1000000000 --dev
+BUCKET_ID=`${CLI} leader:create-bucket -i=0 -a -n=10000 -s=1000000000  --dev` 
 ${CLI} operator:accept-invitation -w=0 -i=${BUCKET_ID} --dev
-${CLI} leader:update-bag -a=${BUCKET_ID} -i static:council --dev 
+${CLI} leader:update-bag -a=${BUCKET_ID} -i static:council --dev 
+${CLI} operator:set-metadata -w 0 -i=${BUCKET_ID} -e http://localhost:3333 --dev

+ 3 - 2
storage-node-v2/scripts/run-all-commands.sh

@@ -24,8 +24,9 @@ BUCKET_ID=`${CLI} leader:create-bucket -i=0 --dev` # bucketId = 0
 ${CLI} operator:accept-invitation -w=0 -i=${BUCKET_ID} --dev
 ${CLI} leader:set-bucket-limits -i=${BUCKET_ID} -o=100 -s=10000000 --dev
 ${CLI} leader:update-bucket-status -i=${BUCKET_ID} --set on --dev
-${CLI} leader:update-bag -a=${BUCKET_ID} -i static:council --dev
-${CLI} operator:set-metadata -w=0 -i=${BUCKET_ID} -e="http://localhost:3333" --dev
+${CLI} leader:update-bag -a=${BUCKET_ID} -i static:council --dev 
+${CLI} operator:set-metadata -w=0 -i=${BUCKET_ID} -e=http://localhost:3333 --dev
+${CLI} operator:set-metadata -w=0 -i=${BUCKET_ID} -j=./operatorMetadata.json --dev
 
 # Create and delete a bucket
 BUCKET_ID=`${CLI} leader:create-bucket -a -n=100 -s=10000000  --dev` # bucketId = 1

+ 4 - 4
storage-node-v2/src/api-spec/openapi.yaml

@@ -23,7 +23,7 @@ tags:
 paths:
   /files/{id}:
     get:
-      operationId: publicApi.getFile
+      operationId: filesApi.getFile
       description: Returns a media file.
       tags:
         - files
@@ -75,7 +75,7 @@ paths:
         500:
           description: Unknown server error
     head:
-      operationId: publicApi.getFileHeaders
+      operationId: filesApi.getFileHeaders
       description: Returns a media file headers.
       tags:
         - files
@@ -100,7 +100,7 @@ paths:
       security:
         - UploadAuth: []
       description: Upload data
-      operationId: publicApi.uploadFile
+      operationId: filesApi.uploadFile
       tags:
         - files
       requestBody:
@@ -151,7 +151,7 @@ paths:
   /authToken:
     post:
       description: Get auth token from a server.
-      operationId: publicApi.authTokenForUploading
+      operationId: filesApi.authTokenForUploading
       tags:
         - files
       requestBody:

+ 1 - 0
storage-node-v2/src/command-base/ExitCodes.ts

@@ -10,6 +10,7 @@ enum ExitCodes {
   FileError,
   InvalidWorkerId,
   InvalidIntegerArray,
+  ServerError,
   ApiError = 200,
   UnsuccessfulRuntimeCall,
 }

+ 14 - 8
storage-node-v2/src/commands/dev/sync.ts

@@ -1,6 +1,7 @@
 import { Command, flags } from '@oclif/command'
 import { performSync } from '../../services/sync/synchronizer'
 import logger from '../../services/logger'
+import stringify from 'fast-safe-stringify'
 
 /**
  * CLI command:
@@ -9,7 +10,7 @@ import logger from '../../services/logger'
  *
  * @remarks
  * Should be run only during the development.
- * Shell command: "dev:upload"
+ * Shell command: "dev:sync"
  */
 export default class DevSync extends Command {
   static description =
@@ -27,15 +28,17 @@ export default class DevSync extends Command {
       required: false,
       description: 'Sync workers number (max async operations in progress).',
     }),
-    queryNodeHost: flags.string({
+    queryNodeEndpoint: flags.string({
       char: 'q',
       required: false,
       description: 'Query node host and port (e.g.: some.com:8081)',
+      default: 'http://localhost:8081/graphql',
     }),
     dataSourceOperatorUrl: flags.string({
       char: 'o',
       required: false,
       description: 'Storage node url base (e.g.: http://some.com:8081) to get data from.',
+      default: 'http://localhost:3333',
     }),
     uploads: flags.string({
       char: 'd',
@@ -49,17 +52,20 @@ export default class DevSync extends Command {
 
     logger.info('Syncing...')
 
-    const queryNodeHost = flags.queryNodeHost ?? 'localhost:8081'
-    const queryNodeUrl = `http://${queryNodeHost}/graphql`
     const syncWorkersNumber = flags.syncWorkersNumber ?? 20
-    const dataSourceOperatorHost = flags.dataSourceOperatorUrl ?? 'http://localhost:3333'
-    const operatorUrl = `${dataSourceOperatorHost}`
 
     try {
-      await performSync(flags.workerId, syncWorkersNumber, queryNodeUrl, flags.uploads, operatorUrl)
+      await performSync(
+        undefined,
+        flags.workerId,
+        syncWorkersNumber,
+        flags.queryNodeEndpoint,
+        flags.uploads,
+        flags.dataSourceOperatorUrl
+      )
     } catch (err) {
       logger.error(err)
-      logger.error(JSON.stringify(err, null, 2))
+      logger.error(stringify(err))
     }
   }
 }

+ 59 - 38
storage-node-v2/src/commands/server.ts

@@ -2,6 +2,7 @@ import { flags } from '@oclif/command'
 import { createApp } from '../services/webApi/app'
 import ApiCommandBase from '../command-base/ApiCommandBase'
 import logger, { initElasticLogger } from '../services/logger'
+import { loadDataObjectIdCache } from '../services/caching/localDataObjects'
 import { ApiPromise } from '@polkadot/api'
 import { performSync, TempDirName } from '../services/sync/synchronizer'
 import sleep from 'sleep-promise'
@@ -13,6 +14,8 @@ import { KeyringPair } from '@polkadot/keyring/types'
 import ExitCodes from './../command-base/ExitCodes'
 import { CLIError } from '@oclif/errors'
 import { Worker } from '@joystream/types/working-group'
+import fs from 'fs'
+const fsPromises = fs.promises
 
 /**
  * CLI command:
@@ -50,10 +53,11 @@ export default class Server extends ApiCommandBase {
       description: 'Interval between synchronizations (in minutes)',
       default: 1,
     }),
-    queryNodeHost: flags.string({
+    queryNodeEndpoint: flags.string({
       char: 'q',
       required: true,
-      description: 'Query node host and port (e.g.: some.com:8081)',
+      default: 'http://localhost:8081/graphql',
+      description: 'Query node endpoint (e.g.: http://some.com:8081/graphql)',
     }),
     syncWorkersNumber: flags.integer({
       char: 'r',
@@ -61,10 +65,10 @@ export default class Server extends ApiCommandBase {
       description: 'Sync workers number (max async operations in progress).',
       default: 20,
     }),
-    elasticSearchHost: flags.string({
+    elasticSearchEndpoint: flags.string({
       char: 'e',
       required: false,
-      description: `Elasticsearch host and port (e.g.: some.com:8081).
+      description: `Elasticsearch endpoint (e.g.: http://some.com:8081).
 Log level could be set using the ELASTIC_LOG_LEVEL enviroment variable.
 Supported values: warn, error, debug, info. Default:debug`,
     }),
@@ -79,16 +83,17 @@ Supported values: warn, error, debug, info. Default:debug`,
   async run(): Promise<void> {
     const { flags } = this.parse(Server)
 
-    await removeTempDirectory(flags.uploads, TempDirName)
+    await recreateTempDirectory(flags.uploads, TempDirName)
 
-    let elasticUrl
-    if (!_.isEmpty(flags.elasticSearchHost)) {
-      elasticUrl = `http://${flags.elasticSearchHost}`
-      initElasticLogger(elasticUrl)
+    if (fs.existsSync(flags.uploads)) {
+      await loadDataObjectIdCache(flags.uploads, TempDirName)
     }
 
-    const queryNodeUrl = `http://${flags.queryNodeHost}/graphql`
-    logger.info(`Query node endpoint set: ${queryNodeUrl}`)
+    if (!_.isEmpty(flags.elasticSearchEndpoint)) {
+      initElasticLogger(flags.elasticSearchEndpoint ?? '')
+    }
+
+    logger.info(`Query node endpoint set: ${flags.queryNodeEndpoint}`)
 
     if (flags.dev) {
       await this.ensureDevelopmentChain()
@@ -98,39 +103,53 @@ Supported values: warn, error, debug, info. Default:debug`,
       logger.warn(`Uploading auth-schema disabled.`)
     }
 
+    const api = await this.getApi()
+
     if (flags.sync) {
       logger.info(`Synchronization enabled.`)
-
-      runSyncWithInterval(flags.worker, queryNodeUrl, flags.uploads, flags.syncWorkersNumber, flags.syncInterval)
+      setTimeout(
+        async () =>
+          runSyncWithInterval(
+            api,
+            flags.worker,
+            flags.queryNodeEndpoint,
+            flags.uploads,
+            TempDirName,
+            flags.syncWorkersNumber,
+            flags.syncInterval
+          ),
+        0
+      )
     }
 
-    const account = this.getAccount(flags)
-    const api = await this.getApi()
+    const storageProviderAccount = this.getAccount(flags)
 
-    await verifyWorkerId(api, flags.worker, account)
+    await verifyWorkerId(api, flags.worker, storageProviderAccount)
 
     try {
       const port = flags.port
-      const workerId = flags.worker ?? 0
+      const workerId = flags.worker
       const maxFileSize = await api.consts.storage.maxDataObjectSize.toNumber()
+      const tempFileUploadingDir = path.join(flags.uploads, TempDirName)
       logger.debug(`Max file size runtime parameter: ${maxFileSize}`)
 
       const app = await createApp({
         api,
-        account,
+        storageProviderAccount,
         workerId,
         maxFileSize,
         uploadsDir: flags.uploads,
-        tempDirName: TempDirName,
+        tempFileUploadingDir,
         process: this.config,
-        queryNodeUrl,
+        queryNodeEndpoint: flags.queryNodeEndpoint,
         enableUploadingAuth: !flags.disableUploadAuth,
-        elasticSearchEndpoint: elasticUrl,
+        elasticSearchEndpoint: flags.elasticSearchEndpoint,
       })
       logger.info(`Listening on http://localhost:${port}`)
       app.listen(port)
     } catch (err) {
       logger.error(`Server error: ${err}`)
+      this.exit(ExitCodes.ServerError)
     }
   }
 
@@ -145,52 +164,54 @@ Supported values: warn, error, debug, info. Default:debug`,
  * @param workerId - worker ID
  * @param queryNodeUrl - Query Node for data fetching
  * @param uploadsDir - data uploading directory
+ * @param tempDirectory - temporary data uploading directory
  * @param syncWorkersNumber - defines a number of the async processes for sync
  * @param syncIntervalMinutes - defines an interval between sync runs
  *
  * @returns void promise.
  */
-function runSyncWithInterval(
+async function runSyncWithInterval(
+  api: ApiPromise,
   workerId: number,
   queryNodeUrl: string,
   uploadsDirectory: string,
+  tempDirectory: string,
   syncWorkersNumber: number,
   syncIntervalMinutes: number
 ) {
-  setTimeout(async () => {
-    const sleepIntevalInSeconds = syncIntervalMinutes * 60 * 1000
-
+  const sleepIntevalInSeconds = syncIntervalMinutes * 60 * 1000
+  while (true) {
     logger.info(`Sync paused for ${syncIntervalMinutes} minute(s).`)
     await sleep(sleepIntevalInSeconds)
-    logger.info(`Resume syncing....`)
-
     try {
-      await performSync(workerId, syncWorkersNumber, queryNodeUrl, uploadsDirectory)
+      logger.info(`Resume syncing....`)
+      await performSync(api, workerId, syncWorkersNumber, queryNodeUrl, uploadsDirectory, tempDirectory)
     } catch (err) {
       logger.error(`Critical sync error: ${err}`)
     }
-
-    runSyncWithInterval(workerId, queryNodeUrl, uploadsDirectory, syncWorkersNumber, syncIntervalMinutes)
-  }, 0)
+  }
 }
 
 /**
- * Removes the temporary directory from the uploading directory.
+ * Removes and recreates the temporary directory from the uploading directory.
  * All files in the temp directory are deleted.
  *
- * @param uploadsDir - data uploading directory
+ * @param uploadsDirectory - data uploading directory
  * @param tempDirName - temporary directory name within the uploading directory
  * @returns void promise.
  */
-async function removeTempDirectory(uploadsDir: string, tempDirName: string): Promise<void> {
+async function recreateTempDirectory(uploadsDirectory: string, tempDirName: string): Promise<void> {
   try {
-    logger.info(`Removing temp directory ...`)
-    const tempFileUploadingDir = path.join(uploadsDir, tempDirName)
+    const tempFileUploadingDir = path.join(uploadsDirectory, tempDirName)
 
+    logger.info(`Removing temp directory ...`)
     const rimrafAsync = promisify(rimraf)
     await rimrafAsync(tempFileUploadingDir)
+
+    logger.info(`Creating temp directory ...`)
+    await fsPromises.mkdir(tempFileUploadingDir)
   } catch (err) {
-    logger.error(`Removing temp directory error: ${err}`)
+    logger.error(`Temp directory IO error: ${err}`)
   }
 }
 
@@ -205,7 +226,7 @@ async function removeTempDirectory(uploadsDir: string, tempDirName: string): Pro
  */
 async function verifyWorkerId(api: ApiPromise, workerId: number, account: KeyringPair): Promise<void> {
   // Cast Codec type to Worker type
-  const workerObj = (await api.query.storageWorkingGroup.workerById(workerId)) as unknown
+  const workerObj = await api.query.storageWorkingGroup.workerById(workerId)
   const worker = workerObj as Worker
 
   if (worker.role_account_id.toString() !== account.address) {

+ 84 - 0
storage-node-v2/src/services/caching/localDataObjects.ts

@@ -0,0 +1,84 @@
+import AwaitLock from 'await-lock'
+import path from 'path'
+import fs from 'fs'
+import logger from '../logger'
+const fsPromises = fs.promises
+
+// Local in-memory cache for IDs.
+let idCache = new Set<string>()
+
+const lock = new AwaitLock()
+
+/**
+ * Return the current ID cache.
+ *
+ * @returns ID array.
+ *
+ */
+export async function getDataObjectIDs(): Promise<string[]> {
+  await lock.acquireAsync()
+  const ids = Array.from(idCache)
+  lock.release()
+
+  return ids
+}
+
+/**
+ * Loads ID cache from the uploading directory.
+ *
+ * @returns empty promise.
+ *
+ * @param uploadDir - uploading directory
+ * @param tempDirName - temp directory name
+ */
+export async function loadDataObjectIdCache(uploadDir: string, tempDirName: string): Promise<void> {
+  await lock.acquireAsync()
+
+  const localIds = await getLocalFileNames(uploadDir)
+  // Filter temporary directory name.
+  const tempDirectoryName = path.parse(tempDirName).name
+  const ids = localIds.filter((dataObjectId) => dataObjectId !== tempDirectoryName)
+
+  idCache = new Set(ids)
+
+  logger.debug(`Local ID cache loaded.`)
+
+  lock.release()
+}
+
+/**
+ * Adds data object ID to the local cache.
+ *
+ * @param dataObjectId - uploading directory
+ *
+ * @returns empty promise.
+ */
+export async function addDataObjectIdToCache(dataObjectId: string): Promise<void> {
+  await lock.acquireAsync()
+
+  idCache.add(dataObjectId)
+
+  lock.release()
+}
+
+/**
+ * Deletes data object ID from the local cache.
+ *
+ * @param dataObjectId - uploading directory
+ */
+export async function deleteDataObjectIdFromCache(dataObjectId: string): Promise<void> {
+  await lock.acquireAsync()
+
+  idCache.delete(dataObjectId)
+
+  lock.release()
+}
+
+/**
+ * Returns file names from the local directory.
+ *
+ * @param directory - local directory to get file names from
+ */
+function getLocalFileNames(directory: string): Promise<string[]> {
+  return fsPromises.readdir(directory)
+}

+ 35 - 0
storage-node-v2/src/services/caching/newUploads.ts

@@ -0,0 +1,35 @@
+import NodeCache from 'node-cache'
+
+// Expiration period in seconds for the new uploads data.
+const ExpirationPeriod = 3600 // seconds (1 hour)
+
+// Max ID number in local cache
+const MaxEntries = 100000
+
+// Local in-memory cache for new data objects.
+const newDataObjects = new NodeCache({
+  stdTTL: ExpirationPeriod,
+  deleteOnExpire: true,
+  maxKeys: MaxEntries,
+})
+
+/**
+ * Adds a data object ID to the cache for new data objects with expiration time.
+ *
+ * * @param dataObjectId - data object ID.
+ *
+ * @returns nonce string.
+ */
+export function registerNewDataObjectId(dataObjectId: string): void {
+  newDataObjects.set(dataObjectId, null, ExpirationPeriod)
+}
+
+/**
+ * Verifies that a data object with provided ID was recently uploaded .
+ *
+ * @param dataObjectId - data object ID.
+ * @returns true if ID was present in local cache.
+ */
+export function isNewDataObject(dataObjectId: string): boolean {
+  return newDataObjects.has(dataObjectId)
+}

+ 0 - 0
storage-node-v2/src/services/helpers/tokenNonceKeeper.ts → storage-node-v2/src/services/caching/tokenNonceKeeper.ts


+ 0 - 0
storage-node-v2/src/services/runtime/transactionNonceKeeper.ts → storage-node-v2/src/services/caching/transactionNonceKeeper.ts


+ 4 - 3
storage-node-v2/src/services/helpers/auth.ts

@@ -2,6 +2,7 @@ import { KeyringPair } from '@polkadot/keyring/types'
 import { u8aToHex } from '@polkadot/util'
 import { signatureVerify } from '@polkadot/util-crypto'
 import base64url from 'base64url'
+import stringify from 'fast-safe-stringify'
 
 /**
  * Represents an upload token request.
@@ -97,7 +98,7 @@ export function parseUploadToken(tokenString: string): UploadToken {
  * @returns The UploadToken instance.
  */
 export function verifyTokenSignature(token: UploadToken | UploadTokenRequest, address: string): boolean {
-  const message = JSON.stringify(token.data)
+  const message = stringify(token.data)
   const { isValid } = signatureVerify(message, token.signature, address)
 
   return isValid
@@ -111,7 +112,7 @@ export function verifyTokenSignature(token: UploadToken | UploadTokenRequest, ad
  * @returns object signature.
  */
 export function signTokenBody(tokenBody: UploadTokenBody | UploadTokenRequestBody, account: KeyringPair): string {
-  const message = JSON.stringify(tokenBody)
+  const message = stringify(tokenBody)
   const signature = u8aToHex(account.sign(message))
 
   return signature
@@ -132,5 +133,5 @@ export function createUploadToken(tokenBody: UploadTokenBody, account: KeyringPa
     signature,
   }
 
-  return base64url.encode(JSON.stringify(token))
+  return base64url.encode(stringify(token))
 }

+ 9 - 21
storage-node-v2/src/services/helpers/bagTypes.ts

@@ -1,8 +1,6 @@
 import { BagId, DynamicBagType, DynamicBagTypeKey, Static, Dynamic } from '@joystream/types/storage'
 import { WorkingGroup } from '@joystream/types/common'
-import { registry } from '@joystream/types'
-import { createType } from '@polkadot/types'
-import { DetectCodec, Codec } from '@polkadot/types/types'
+import { createType } from '@joystream/types'
 import ExitCodes from '../../command-base/ExitCodes'
 import { CLIError } from '@oclif/errors'
 
@@ -29,7 +27,7 @@ export class BagIdValidationError extends CLIError {
  * @returns The DynamicBagType instance.
  */
 export function parseDynamicBagType(bagType: DynamicBagTypeKey): DynamicBagType {
-  return createJoystreamType('DynamicBagType', bagType)
+  return createType('DynamicBagType', bagType)
 }
 
 /**
@@ -89,8 +87,8 @@ class BagIdParser {
     // Try to construct static council bag ID.
     if (this.bagIdParts[1] === 'council') {
       if (this.bagIdParts.length === 2) {
-        const staticBagId: Static = createJoystreamType('Static', 'Council')
-        const constructedBagId: BagId = createJoystreamType('BagId', {
+        const staticBagId: Static = createType('Static', 'Council')
+        const constructedBagId: BagId = createType('BagId', {
           'Static': staticBagId,
         })
 
@@ -106,11 +104,11 @@ class BagIdParser {
 
         for (const group of groups) {
           if (group.toLowerCase() === actualGroup) {
-            const workingGroup: WorkingGroup = createJoystreamType('WorkingGroup', group)
-            const staticBagId: Static = createJoystreamType('Static', {
+            const workingGroup: WorkingGroup = createType('WorkingGroup', group)
+            const staticBagId: Static = createType('Static', {
               'WorkingGroup': workingGroup,
             })
-            const constructedBagId: BagId = createJoystreamType('BagId', {
+            const constructedBagId: BagId = createType('BagId', {
               'Static': staticBagId,
             })
 
@@ -142,8 +140,8 @@ class BagIdParser {
             const dynamic = {} as Record<DynamicBagTypeKey, number>
             dynamic[dynamicBagType as DynamicBagTypeKey] = parsedId
 
-            const dynamicBagId: Dynamic = createJoystreamType('Dynamic', dynamic)
-            const constructedBagId: BagId = createJoystreamType('BagId', {
+            const dynamicBagId: Dynamic = createType('Dynamic', dynamic)
+            const constructedBagId: BagId = createType('BagId', {
               'Dynamic': dynamicBagId,
             })
 
@@ -156,13 +154,3 @@ class BagIdParser {
     throw new BagIdValidationError(`Invalid dynamic bagId: ${this.bagId}`)
   }
 }
-
-/**
- * Creates Joystream type using type registry.
- */
-function createJoystreamType<T extends Codec = Codec, K extends string = string>(
-  type: K,
-  value: unknown
-): DetectCodec<T, K> {
-  return createType(registry, type, value)
-}

+ 11 - 22
storage-node-v2/src/services/logger.ts

@@ -1,13 +1,9 @@
 import winston, { transport } from 'winston'
+import ecsformat from '@elastic/ecs-winston-format'
 import expressWinston from 'express-winston'
 import { Handler, ErrorRequestHandler } from 'express'
 import { ElasticsearchTransport } from 'winston-elasticsearch'
 
-/**
- * ElasticSearch server date format.
- */
-const elasticDateFormat = 'YYYY-MM-DDTHH:mm:ss'
-
 /**
  * Possible log levels.
  */
@@ -50,12 +46,11 @@ function createDefaultLoggerOptions(): winston.LoggerOptions {
   )
 
   // Redirect all logs to the stderr
-  const transports = [new winston.transports.Console({ stderrLevels: Object.keys(levels) })]
+  const transports = [new winston.transports.Console({ stderrLevels: Object.keys(levels), format })]
 
   return {
     level: level(),
     levels,
-    format,
     transports,
   }
 }
@@ -95,7 +90,14 @@ export default proxy
  *
  */
 export function httpLogger(elasticSearchEndpoint?: string): Handler {
-  const transports: winston.transport[] = [new winston.transports.Console()]
+  // ElasticSearch server date format.
+  const elasticDateFormat = 'YYYY-MM-DDTHH:mm:ss'
+
+  const transports: winston.transport[] = [
+    new winston.transports.Console({
+      format: winston.format.combine(winston.format.timestamp({ format: elasticDateFormat }), winston.format.json()),
+    }),
+  ]
 
   if (elasticSearchEndpoint) {
     const esTransport = createElasticTransport(elasticSearchEndpoint)
@@ -104,7 +106,6 @@ export function httpLogger(elasticSearchEndpoint?: string): Handler {
 
   const opts: expressWinston.LoggerOptions = {
     transports,
-    format: winston.format.combine(winston.format.timestamp({ format: elasticDateFormat }), winston.format.json()),
     meta: true,
     msg: 'HTTP {{req.method}} {{req.url}}',
     expressFormat: true,
@@ -134,13 +135,6 @@ export function errorLogger(): ErrorRequestHandler {
  *
  */
 export function createStdConsoleLogger(): winston.Logger {
-  const levels = {
-    error: 0,
-    warn: 1,
-    info: 2,
-    http: 3,
-    debug: 4,
-  }
   const format = winston.format.printf((info) => `${info.message}`)
 
   const transports = [new winston.transports.Console()]
@@ -160,12 +154,6 @@ export function createStdConsoleLogger(): winston.Logger {
 function createElasticLogger(elasticSearchEndpoint: string): winston.Logger {
   const loggerOptions = createDefaultLoggerOptions()
 
-  // Formats
-  loggerOptions.format = winston.format.combine(
-    winston.format.timestamp({ format: elasticDateFormat }),
-    winston.format.printf((info) => `${info.timestamp} ${info.level}: ${info.message}`)
-  )
-
   // Transports
   let transports: transport[] = []
   if (loggerOptions.transports !== undefined) {
@@ -217,6 +205,7 @@ function createElasticTransport(elasticSearchEndpoint: string): winston.transpor
     level: elasticLogLevel,
     clientOpts: { node: elasticSearchEndpoint, maxRetries: 5 },
     index: 'storage-node',
+    format: ecsformat(),
   }
   return new ElasticsearchTransport(esTransportOpts)
 }

+ 2 - 1
storage-node-v2/src/services/metadata/validationService.ts

@@ -1,5 +1,6 @@
 import Ajv from 'ajv'
 import { SchemaKey, schemas, TypeBySchemaKey } from './schemas'
+import stringify from 'fast-safe-stringify'
 
 /**
  * JSON schema validation error
@@ -28,7 +29,7 @@ export class ValidationService {
     if (!valid) {
       throw new ValidationError(
         `${schemaKey} is not valid`,
-        this.ajv.errors?.map((e) => `${e.dataPath}: ${e.message} (${JSON.stringify(e.params)})`) || []
+        this.ajv.errors?.map((e) => `${e.dataPath}: ${e.message} (${stringify(e.params)})`) || []
       )
     }
     return input as TypeBySchemaKey<SK>

+ 126 - 34
storage-node-v2/src/services/queryNode/api.ts

@@ -1,21 +1,47 @@
 import { ApolloClient, NormalizedCacheObject, HttpLink, InMemoryCache, DocumentNode } from '@apollo/client'
 import fetch from 'cross-fetch'
 import {
+  GetBagConnection,
+  GetBagConnectionQuery,
+  GetBagConnectionQueryVariables,
   GetStorageBucketDetails,
   GetStorageBucketDetailsQuery,
+  GetStorageBucketDetailsByWorkerIdQuery,
+  GetStorageBucketDetailsByWorkerIdQueryVariables,
   GetStorageBucketDetailsQueryVariables,
   StorageBucketDetailsFragment,
-  GetStorageBagDetailsQuery,
-  GetStorageBagDetails,
   StorageBagDetailsFragment,
-  GetStorageBagDetailsQueryVariables,
   DataObjectDetailsFragment,
-  GetDataObjectDetailsQuery,
-  GetDataObjectDetailsQueryVariables,
-  GetDataObjectDetails,
+  GetDataObjectConnectionQuery,
+  GetDataObjectConnectionQueryVariables,
+  GetDataObjectConnection,
+  StorageBucketIdsFragment,
+  GetStorageBucketsConnection,
+  GetStorageBucketsConnectionQuery,
+  GetStorageBucketsConnectionQueryVariables,
 } from './generated/queries'
 import { Maybe, StorageBagWhereInput } from './generated/schema'
 
+import logger from '../logger'
+
+/**
+ * Defines query paging limits.
+ */
+export const MAX_RESULTS_PER_QUERY = 1000
+
+type PaginationQueryVariables = {
+  limit: number
+  lastCursor?: Maybe<string>
+}
+
+type PaginationQueryResult<T = unknown> = {
+  edges: { node: T }[]
+  pageInfo: {
+    hasNextPage: boolean
+    endCursor?: Maybe<string>
+  }
+}
+
 /**
  * Query node class helper. Incapsulates custom queries.
  *
@@ -28,7 +54,7 @@ export class QueryNodeApi {
       link: new HttpLink({ uri: endpoint, fetch }),
       cache: new InMemoryCache(),
       defaultOptions: {
-        query: { fetchPolicy: 'no-cache', errorPolicy: 'all' },
+        query: { fetchPolicy: 'no-cache', errorPolicy: 'none' },
       },
     })
   }
@@ -76,6 +102,40 @@ export class QueryNodeApi {
     return result.data[resultKey][0]
   }
 
+  protected async multipleEntitiesWithPagination<
+    NodeT,
+    QueryT extends { [k: string]: PaginationQueryResult<NodeT> },
+    CustomVariablesT extends Record<string, unknown>
+  >(
+    query: DocumentNode,
+    variables: CustomVariablesT,
+    resultKey: keyof QueryT,
+    itemsPerPage = MAX_RESULTS_PER_QUERY
+  ): Promise<NodeT[]> {
+    let hasNextPage = true
+    let results: NodeT[] = []
+    let lastCursor: string | undefined
+    while (hasNextPage) {
+      const paginationVariables = { limit: itemsPerPage, cursor: lastCursor }
+      const queryVariables = { ...variables, ...paginationVariables }
+      logger.debug(`Query - ${resultKey}`)
+      const result = await this.apolloClient.query<QueryT, PaginationQueryVariables & CustomVariablesT>({
+        query,
+        variables: queryVariables,
+      })
+
+      if (!result?.data) {
+        return results
+      }
+
+      const page = result.data[resultKey]
+      results = results.concat(page.edges.map((e) => e.node))
+      hasNextPage = page.pageInfo.hasNextPage
+      lastCursor = page.pageInfo.endCursor || undefined
+    }
+    return results
+  }
+
   /**
    * Query-node: get multiple entities
    *
@@ -95,20 +155,45 @@ export class QueryNodeApi {
     if (result?.data === null) {
       return null
     }
+
     return result.data[resultKey]
   }
 
+  /**
+   * Returns storage bucket IDs filtered by worker ID.
+   *
+   * @param workerId - worker ID
+   */
+  public async getStorageBucketDetailsByWorkerId(workerId: string): Promise<Array<StorageBucketIdsFragment>> {
+    const result = await this.multipleEntitiesWithPagination<
+      StorageBucketIdsFragment,
+      GetStorageBucketDetailsByWorkerIdQuery,
+      GetStorageBucketDetailsByWorkerIdQueryVariables
+    >(GetStorageBucketsConnection, { workerId, limit: MAX_RESULTS_PER_QUERY }, 'storageBucketsConnection')
+
+    if (!result) {
+      return []
+    }
+
+    return result
+  }
+
   /**
    * Returns storage bucket info by pages.
    *
+   * @param ids - bucket IDs to fetch
    * @param offset - starting record of the page
    * @param limit - page size
    */
-  public async getStorageBucketDetails(offset: number, limit: number): Promise<Array<StorageBucketDetailsFragment>> {
+  public async getStorageBucketDetails(
+    ids: string[],
+    offset: number,
+    limit: number
+  ): Promise<Array<StorageBucketDetailsFragment>> {
     const result = await this.multipleEntitiesQuery<
       GetStorageBucketDetailsQuery,
       GetStorageBucketDetailsQueryVariables
-    >(GetStorageBucketDetails, { offset, limit }, 'storageBuckets')
+    >(GetStorageBucketDetails, { offset, limit, ids }, 'storageBuckets')
 
     if (result === null) {
       return []
@@ -121,21 +206,15 @@ export class QueryNodeApi {
    * Returns storage bag info by pages for the given buckets.
    *
    * @param bucketIds - query filter: bucket IDs
-   * @param offset - starting record of the page
-   * @param limit - page size
    */
-  public async getStorageBagsDetails(
-    bucketIds: string[],
-    offset: number,
-    limit: number
-  ): Promise<Array<StorageBagDetailsFragment>> {
-    const result = await this.multipleEntitiesQuery<GetStorageBagDetailsQuery, GetStorageBagDetailsQueryVariables>(
-      GetStorageBagDetails,
-      { offset, limit, bucketIds },
-      'storageBags'
-    )
+  public async getStorageBagsDetails(bucketIds: string[]): Promise<Array<StorageBagDetailsFragment>> {
+    const result = await this.multipleEntitiesWithPagination<
+      StorageBagDetailsFragment,
+      GetBagConnectionQuery,
+      GetBagConnectionQueryVariables
+    >(GetBagConnection, { limit: MAX_RESULTS_PER_QUERY, bucketIds }, 'storageBagsConnection')
 
-    if (result === null) {
+    if (!result) {
       return []
     }
 
@@ -147,21 +226,34 @@ export class QueryNodeApi {
    *
    * @param bagIds - query filter: bag IDs
    * @param offset - starting record of the page
-   * @param limit - page size
    */
-  public async getDataObjectDetails(
-    bagIds: string[],
-    offset: number,
-    limit: number
-  ): Promise<Array<DataObjectDetailsFragment>> {
+  public async getDataObjectDetails(bagIds: string[]): Promise<Array<DataObjectDetailsFragment>> {
     const input: StorageBagWhereInput = { id_in: bagIds }
-    const result = await this.multipleEntitiesQuery<GetDataObjectDetailsQuery, GetDataObjectDetailsQueryVariables>(
-      GetDataObjectDetails,
-      { offset, limit, bagIds: input },
-      'storageDataObjects'
-    )
+    const result = await this.multipleEntitiesWithPagination<
+      DataObjectDetailsFragment,
+      GetDataObjectConnectionQuery,
+      GetDataObjectConnectionQueryVariables
+    >(GetDataObjectConnection, { limit: MAX_RESULTS_PER_QUERY, bagIds: input }, 'storageDataObjectsConnection')
 
-    if (result === null) {
+    if (!result) {
+      return []
+    }
+
+    return result
+  }
+
+  /**
+   * Returns storage bucket IDs.
+   *
+   */
+  public async getStorageBucketIds(): Promise<Array<StorageBucketIdsFragment>> {
+    const result = await this.multipleEntitiesWithPagination<
+      StorageBucketIdsFragment,
+      GetStorageBucketsConnectionQuery,
+      GetStorageBucketsConnectionQueryVariables
+    >(GetStorageBucketsConnection, { limit: MAX_RESULTS_PER_QUERY }, 'storageBucketsConnection')
+
+    if (!result) {
       return []
     }
 

+ 138 - 21
storage-node-v2/src/services/queryNode/generated/queries.ts

@@ -1,6 +1,35 @@
 import * as Types from './schema'
 
 import gql from 'graphql-tag'
+export type StorageBucketIdsFragment = { id: string }
+
+export type GetStorageBucketsConnectionQueryVariables = Types.Exact<{
+  limit?: Types.Maybe<Types.Scalars['Int']>
+  cursor?: Types.Maybe<Types.Scalars['String']>
+}>
+
+export type GetStorageBucketsConnectionQuery = {
+  storageBucketsConnection: {
+    totalCount: number
+    edges: Array<{ cursor: string; node: StorageBucketIdsFragment }>
+    pageInfo: { hasNextPage: boolean; endCursor?: Types.Maybe<string> }
+  }
+}
+
+export type GetStorageBucketDetailsByWorkerIdQueryVariables = Types.Exact<{
+  workerId?: Types.Maybe<Types.Scalars['ID']>
+  limit?: Types.Maybe<Types.Scalars['Int']>
+  cursor?: Types.Maybe<Types.Scalars['String']>
+}>
+
+export type GetStorageBucketDetailsByWorkerIdQuery = {
+  storageBucketsConnection: {
+    totalCount: number
+    edges: Array<{ cursor: string; node: StorageBucketIdsFragment }>
+    pageInfo: { hasNextPage: boolean; endCursor?: Types.Maybe<string> }
+  }
+}
+
 export type StorageBucketDetailsFragment = {
   id: string
   operatorMetadata?: Types.Maybe<{ id: string; nodeEndpoint?: Types.Maybe<string> }>
@@ -8,32 +37,58 @@ export type StorageBucketDetailsFragment = {
 }
 
 export type GetStorageBucketDetailsQueryVariables = Types.Exact<{
+  ids?: Types.Maybe<Array<Types.Scalars['ID']> | Types.Scalars['ID']>
   offset?: Types.Maybe<Types.Scalars['Int']>
   limit?: Types.Maybe<Types.Scalars['Int']>
 }>
 
 export type GetStorageBucketDetailsQuery = { storageBuckets: Array<StorageBucketDetailsFragment> }
 
-export type StorageBagDetailsFragment = { id: string; storageAssignments: Array<{ storageBucket: { id: string } }> }
+export type StorageBagDetailsFragment = { id: string; storageBuckets: Array<{ id: string }> }
 
 export type GetStorageBagDetailsQueryVariables = Types.Exact<{
-  bucketIds?: Types.Maybe<Array<Types.Scalars['String']> | Types.Scalars['String']>
+  bucketIds?: Types.Maybe<Array<Types.Scalars['ID']> | Types.Scalars['ID']>
   offset?: Types.Maybe<Types.Scalars['Int']>
   limit?: Types.Maybe<Types.Scalars['Int']>
 }>
 
 export type GetStorageBagDetailsQuery = { storageBags: Array<StorageBagDetailsFragment> }
 
+export type GetBagConnectionQueryVariables = Types.Exact<{
+  bucketIds?: Types.Maybe<Array<Types.Scalars['ID']> | Types.Scalars['ID']>
+  limit?: Types.Maybe<Types.Scalars['Int']>
+  cursor?: Types.Maybe<Types.Scalars['String']>
+}>
+
+export type GetBagConnectionQuery = {
+  storageBagsConnection: {
+    totalCount: number
+    edges: Array<{ cursor: string; node: StorageBagDetailsFragment }>
+    pageInfo: { hasNextPage: boolean; endCursor?: Types.Maybe<string> }
+  }
+}
+
 export type DataObjectDetailsFragment = { id: string; storageBagId: string }
 
-export type GetDataObjectDetailsQueryVariables = Types.Exact<{
+export type GetDataObjectConnectionQueryVariables = Types.Exact<{
   bagIds?: Types.Maybe<Types.StorageBagWhereInput>
-  offset?: Types.Maybe<Types.Scalars['Int']>
   limit?: Types.Maybe<Types.Scalars['Int']>
+  cursor?: Types.Maybe<Types.Scalars['String']>
 }>
 
-export type GetDataObjectDetailsQuery = { storageDataObjects: Array<DataObjectDetailsFragment> }
+export type GetDataObjectConnectionQuery = {
+  storageDataObjectsConnection: {
+    totalCount: number
+    edges: Array<{ cursor: string; node: DataObjectDetailsFragment }>
+    pageInfo: { hasNextPage: boolean; endCursor?: Types.Maybe<string> }
+  }
+}
 
+export const StorageBucketIds = gql`
+  fragment StorageBucketIds on StorageBucket {
+    id
+  }
+`
 export const StorageBucketDetails = gql`
   fragment StorageBucketDetails on StorageBucket {
     id
@@ -54,10 +109,8 @@ export const StorageBucketDetails = gql`
 export const StorageBagDetails = gql`
   fragment StorageBagDetails on StorageBag {
     id
-    storageAssignments {
-      storageBucket {
-        id
-      }
+    storageBuckets {
+      id
     }
   }
 `
@@ -67,30 +120,94 @@ export const DataObjectDetails = gql`
     storageBagId
   }
 `
+export const GetStorageBucketsConnection = gql`
+  query getStorageBucketsConnection($limit: Int, $cursor: String) {
+    storageBucketsConnection(first: $limit, after: $cursor) {
+      edges {
+        cursor
+        node {
+          ...StorageBucketIds
+        }
+      }
+      pageInfo {
+        hasNextPage
+        endCursor
+      }
+      totalCount
+    }
+  }
+  ${StorageBucketIds}
+`
+export const GetStorageBucketDetailsByWorkerId = gql`
+  query getStorageBucketDetailsByWorkerId($workerId: ID, $limit: Int, $cursor: String) {
+    storageBucketsConnection(
+      first: $limit
+      after: $cursor
+      where: { operatorStatus_json: { isTypeOf_eq: "StorageBucketOperatorStatusActive", workerId_eq: $workerId } }
+    ) {
+      edges {
+        cursor
+        node {
+          ...StorageBucketIds
+        }
+      }
+      pageInfo {
+        hasNextPage
+        endCursor
+      }
+      totalCount
+    }
+  }
+  ${StorageBucketIds}
+`
 export const GetStorageBucketDetails = gql`
-  query getStorageBucketDetails($offset: Int, $limit: Int) {
-    storageBuckets(offset: $offset, limit: $limit) {
+  query getStorageBucketDetails($ids: [ID!], $offset: Int, $limit: Int) {
+    storageBuckets(where: { id_in: $ids }, offset: $offset, limit: $limit) {
       ...StorageBucketDetails
     }
   }
   ${StorageBucketDetails}
 `
 export const GetStorageBagDetails = gql`
-  query getStorageBagDetails($bucketIds: [String!], $offset: Int, $limit: Int) {
-    storageBags(
-      offset: $offset
-      limit: $limit
-      where: { storageAssignments_some: { storageBucketId_in: $bucketIds } }
-    ) {
+  query getStorageBagDetails($bucketIds: [ID!], $offset: Int, $limit: Int) {
+    storageBags(offset: $offset, limit: $limit, where: { storageBuckets_some: { id_in: $bucketIds } }) {
       ...StorageBagDetails
     }
   }
   ${StorageBagDetails}
 `
-export const GetDataObjectDetails = gql`
-  query getDataObjectDetails($bagIds: StorageBagWhereInput, $offset: Int, $limit: Int) {
-    storageDataObjects(offset: $offset, limit: $limit, where: { storageBag: $bagIds, isAccepted_eq: true }) {
-      ...DataObjectDetails
+export const GetBagConnection = gql`
+  query getBagConnection($bucketIds: [ID!], $limit: Int, $cursor: String) {
+    storageBagsConnection(first: $limit, after: $cursor, where: { storageBuckets_some: { id_in: $bucketIds } }) {
+      edges {
+        cursor
+        node {
+          ...StorageBagDetails
+        }
+      }
+      pageInfo {
+        hasNextPage
+        endCursor
+      }
+      totalCount
+    }
+  }
+  ${StorageBagDetails}
+`
+export const GetDataObjectConnection = gql`
+  query getDataObjectConnection($bagIds: StorageBagWhereInput, $limit: Int, $cursor: String) {
+    storageDataObjectsConnection(first: $limit, after: $cursor, where: { storageBag: $bagIds, isAccepted_eq: true }) {
+      edges {
+        cursor
+        node {
+          ...DataObjectDetails
+        }
+      }
+      pageInfo {
+        hasNextPage
+        endCursor
+      }
+      totalCount
     }
   }
   ${DataObjectDetails}

Разница между файлами не показана из-за своего большого размера
+ 332 - 478
storage-node-v2/src/services/queryNode/generated/schema.ts


+ 76 - 11
storage-node-v2/src/services/queryNode/queries/queries.graphql

@@ -1,3 +1,44 @@
+# TODO: remove after issue fix: https://github.com/Joystream/joystream/issues/2811
+fragment StorageBucketIds on StorageBucket {
+  id
+}
+
+query getStorageBucketsConnection($limit: Int, $cursor: String) {
+  storageBucketsConnection(first: $limit, after: $cursor) {
+    edges {
+      cursor
+      node {
+        ...StorageBucketIds
+      }
+    }
+    pageInfo {
+      hasNextPage
+      endCursor
+    }
+    totalCount
+  }
+}
+
+query getStorageBucketDetailsByWorkerId($workerId: ID, $limit: Int, $cursor: String) {
+  storageBucketsConnection(
+    first: $limit
+    after: $cursor
+    where: { operatorStatus_json: { isTypeOf_eq: "StorageBucketOperatorStatusActive", workerId_eq: $workerId } }
+  ) {
+    edges {
+      cursor
+      node {
+        ...StorageBucketIds
+      }
+    }
+    pageInfo {
+      hasNextPage
+      endCursor
+    }
+    totalCount
+  }
+}
+
 fragment StorageBucketDetails on StorageBucket {
   id
   operatorMetadata {
@@ -14,34 +55,58 @@ fragment StorageBucketDetails on StorageBucket {
   }
 }
 
-query getStorageBucketDetails($offset: Int, $limit: Int) {
-  storageBuckets(offset: $offset, limit: $limit) {
+query getStorageBucketDetails($ids: [ID!], $offset: Int, $limit: Int) {
+  storageBuckets(where: { id_in: $ids }, offset: $offset, limit: $limit) {
     ...StorageBucketDetails
   }
 }
 
 fragment StorageBagDetails on StorageBag {
   id
-  storageAssignments {
-    storageBucket {
-      id
-    }
+  storageBuckets {
+    id
   }
 }
 
-query getStorageBagDetails($bucketIds: [String!], $offset: Int, $limit: Int) {
-  storageBags(offset: $offset, limit: $limit, where: { storageAssignments_some: { storageBucketId_in: $bucketIds } }) {
+query getStorageBagDetails($bucketIds: [ID!], $offset: Int, $limit: Int) {
+  storageBags(offset: $offset, limit: $limit, where: { storageBuckets_some: { id_in: $bucketIds } }) {
     ...StorageBagDetails
   }
 }
 
+query getBagConnection($bucketIds: [ID!], $limit: Int, $cursor: String) {
+  storageBagsConnection(first: $limit, after: $cursor, where: { storageBuckets_some: { id_in: $bucketIds } }) {
+    edges {
+      cursor
+      node {
+        ...StorageBagDetails
+      }
+    }
+    pageInfo {
+      hasNextPage
+      endCursor
+    }
+    totalCount
+  }
+}
+
 fragment DataObjectDetails on StorageDataObject {
   id
   storageBagId
 }
 
-query getDataObjectDetails($bagIds: StorageBagWhereInput, $offset: Int, $limit: Int) {
-  storageDataObjects(offset: $offset, limit: $limit, where: { storageBag: $bagIds, isAccepted_eq: true }) {
-    ...DataObjectDetails
+query getDataObjectConnection($bagIds: StorageBagWhereInput, $limit: Int, $cursor: String) {
+  storageDataObjectsConnection(first: $limit, after: $cursor, where: { storageBag: $bagIds, isAccepted_eq: true }) {
+    edges {
+      cursor
+      node {
+        ...DataObjectDetails
+      }
+    }
+    pageInfo {
+      hasNextPage
+      endCursor
+    }
+    totalCount
   }
 }

+ 5 - 4
storage-node-v2/src/services/runtime/api.ts

@@ -6,10 +6,11 @@ import { TypeRegistry } from '@polkadot/types'
 import { KeyringPair } from '@polkadot/keyring/types'
 import { SubmittableExtrinsic, AugmentedEvent } from '@polkadot/api/types'
 import { DispatchError, DispatchResult } from '@polkadot/types/interfaces/system'
-import { getTransactionNonce, resetTransactionNonceCache } from './transactionNonceKeeper'
+import { getTransactionNonce, resetTransactionNonceCache } from '../caching/transactionNonceKeeper'
 import logger from '../../services/logger'
 import ExitCodes from '../../command-base/ExitCodes'
 import { CLIError } from '@oclif/errors'
+import stringify from 'fast-safe-stringify'
 
 /**
  * Dedicated error for the failed extrinsics.
@@ -24,12 +25,12 @@ export class ExtrinsicFailedError extends CLIError {}
  */
 export async function createApi(apiUrl: string): Promise<ApiPromise> {
   const provider = new WsProvider(apiUrl)
-  provider.on('error', (err) => logger.error(`Api provider error: ${err.target?._url}`))
+  provider.on('error', (err) => logger.error(`Api provider error: ${err.target?._url}`, { err }))
 
   const api = new ApiPromise({ provider, types })
   await api.isReadyOrError
 
-  api.on('error', (err) => logger.error(`Api promise error: ${err.target?._url}`))
+  api.on('error', (err) => logger.error(`Api promise error: ${err.target?._url}`, { err }))
 
   return api
 }
@@ -109,7 +110,7 @@ function sendExtrinsic(
       .then((unsubFunc) => (unsubscribe = unsubFunc))
       .catch((e) =>
         reject(
-          new ExtrinsicFailedError(`Cannot send the extrinsic: ${e.message ? e.message : JSON.stringify(e)}`, {
+          new ExtrinsicFailedError(`Cannot send the extrinsic: ${e.message ? e.message : stringify(e)}`, {
             exit: ExitCodes.ApiError,
           })
         )

+ 1 - 1
storage-node-v2/src/services/runtime/extrinsics.ts

@@ -273,7 +273,7 @@ export async function inviteStorageBucketOperator(
 async function extrinsicWrapper(
   extrinsic: () => Promise<void>,
   throwErr = false,
-  timeoutMs = 10000 // 10s - default extrinsic timeout
+  timeoutMs = 25000 // 25s - default extrinsic timeout
 ): Promise<boolean> {
   try {
     await timeout(extrinsic(), timeoutMs)

+ 13 - 18
storage-node-v2/src/services/sync/storageObligations.ts

@@ -1,4 +1,4 @@
-import { QueryNodeApi } from '../queryNode/api'
+import { MAX_RESULTS_PER_QUERY, QueryNodeApi } from '../queryNode/api'
 import logger from '../logger'
 import {
   StorageBagDetailsFragment,
@@ -108,7 +108,7 @@ export async function getStorageObligationsFromRuntime(
     })),
     bags: assignedBags.map((bag) => ({
       id: bag.id,
-      buckets: bag.storageAssignments.map((bucketInBag) => bucketInBag.storageBucket.id),
+      buckets: bag.storageBuckets.map((bucketInBag) => bucketInBag.id),
     })),
     dataObjects: assignedDataObjects.map((dataObject) => ({
       id: dataObject.id,
@@ -128,13 +128,11 @@ export async function getStorageObligationsFromRuntime(
  */
 export async function getStorageBucketIdsByWorkerId(queryNodeUrl: string, workerId: number): Promise<string[]> {
   const api = new QueryNodeApi(queryNodeUrl)
-  const allBuckets = await getAllBuckets(api)
 
-  const bucketIds = allBuckets
-    .filter((bucket) => bucket.operatorStatus?.workerId === workerId)
-    .map((bucket) => bucket.id)
+  const idFragments = await api.getStorageBucketDetailsByWorkerId(workerId.toString())
+  const ids = idFragments.map((frag) => frag.id)
 
-  return bucketIds
+  return ids
 }
 
 /**
@@ -158,9 +156,12 @@ export async function getDataObjectIDsByBagId(queryNodeUrl: string, bagId: strin
  * @returns storage buckets data
  */
 async function getAllBuckets(api: QueryNodeApi): Promise<StorageBucketDetailsFragment[]> {
+  const idFragments = await api.getStorageBucketIds()
+  const ids = idFragments.map((frag) => frag.id)
+
   return await getAllObjectsWithPaging(
-    'all storage buckets',
-    async (offset, limit) => await api.getStorageBucketDetails(offset, limit)
+    'get all storage buckets',
+    async (offset, limit) => await api.getStorageBucketDetails(ids, offset, limit)
   )
 }
 
@@ -172,10 +173,7 @@ async function getAllBuckets(api: QueryNodeApi): Promise<StorageBucketDetailsFra
  * @returns storage bag data
  */
 async function getAllAssignedDataObjects(api: QueryNodeApi, bagIds: string[]): Promise<DataObjectDetailsFragment[]> {
-  return await getAllObjectsWithPaging(
-    'assigned data objects',
-    async (offset, limit) => await api.getDataObjectDetails(bagIds, offset, limit)
-  )
+  return await api.getDataObjectDetails(bagIds)
 }
 
 /**
@@ -186,10 +184,7 @@ async function getAllAssignedDataObjects(api: QueryNodeApi, bagIds: string[]): P
  * @returns storage bag data
  */
 async function getAllAssignedBags(api: QueryNodeApi, bucketIds: string[]): Promise<StorageBagDetailsFragment[]> {
-  return await getAllObjectsWithPaging(
-    'assigned bags',
-    async (offset, limit) => await api.getStorageBagsDetails(bucketIds, offset, limit)
-  )
+  return await api.getStorageBagsDetails(bucketIds)
 }
 
 /**
@@ -205,7 +200,7 @@ async function getAllObjectsWithPaging<T>(
   query: (offset: number, limit: number) => Promise<T[]>
 ): Promise<T[]> {
   const result = []
-  const limit = 1000
+  const limit = MAX_RESULTS_PER_QUERY
   let offset = 0
 
   let resultPart = []

+ 44 - 45
storage-node-v2/src/services/sync/synchronizer.ts

@@ -1,11 +1,10 @@
 import { getStorageObligationsFromRuntime, DataObligations } from './storageObligations'
 import logger from '../../services/logger'
+import { getDataObjectIDs } from '../../services/caching/localDataObjects'
 import { SyncTask, DownloadFileTask, DeleteLocalFileTask, PrepareDownloadFileTask } from './tasks'
 import { WorkingStack, TaskProcessorSpawner, TaskSink } from './workingProcess'
 import _ from 'lodash'
-import fs from 'fs'
-import path from 'path'
-const fsPromises = fs.promises
+import { ApiPromise } from '@polkadot/api'
 
 /**
  * Temporary directory name for data uploading.
@@ -18,6 +17,7 @@ export const TempDirName = 'temp'
  * The sync process uses the QueryNode for defining storage obligations and
  * remote storage nodes' URL for data obtaining.
  *
+ * @param api - (optional) runtime API promise
  * @param workerId - current storage provider ID
  * @param asyncWorkersNumber - maximum parallel downloads number
  * @param queryNodeUrl - Query Node endpoint URL
@@ -27,16 +27,18 @@ export const TempDirName = 'temp'
  * Node information about the storage providers.
  */
 export async function performSync(
+  api: ApiPromise | undefined,
   workerId: number,
   asyncWorkersNumber: number,
   queryNodeUrl: string,
   uploadDirectory: string,
+  tempDirectory: string,
   operatorUrl?: string
 ): Promise<void> {
   logger.info('Started syncing...')
   const [model, files] = await Promise.all([
     getStorageObligationsFromRuntime(queryNodeUrl, workerId),
-    getLocalDataObjects(uploadDirectory, TempDirName),
+    getDataObjectIDs(),
   ])
 
   const requiredIds = model.dataObjects.map((obj) => obj.id)
@@ -52,9 +54,17 @@ export async function performSync(
 
   let addedTasks: SyncTask[]
   if (operatorUrl === undefined) {
-    addedTasks = await getPrepareDownloadTasks(model, added, uploadDirectory, workingStack)
+    addedTasks = await getPrepareDownloadTasks(
+      api,
+      model,
+      workerId,
+      added,
+      uploadDirectory,
+      tempDirectory,
+      workingStack
+    )
   } else {
-    addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory)
+    addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory, tempDirectory)
   }
 
   logger.debug(`Sync - started processing...`)
@@ -71,53 +81,62 @@ export async function performSync(
 /**
  * Creates the download preparation tasks.
  *
+ * @param api - Runtime API promise
  * @param dataObligations - defines the current data obligations for the node
  * @param addedIds - data object IDs to download
  * @param uploadDirectory - local directory for data uploading
+ * @param tempDirectory - local directory for temporary data uploading
  * @param taskSink - a destination for the newly created tasks
  */
 async function getPrepareDownloadTasks(
+  api: ApiPromise | undefined,
   dataObligations: DataObligations,
+  currentWorkerId: number,
   addedIds: string[],
   uploadDirectory: string,
+  tempDirectory: string,
   taskSink: TaskSink
 ): Promise<PrepareDownloadFileTask[]> {
-  const idMap = new Map()
+  const bagIdByDataObjectId = new Map()
   for (const entry of dataObligations.dataObjects) {
-    idMap.set(entry.id, entry.bagId)
+    bagIdByDataObjectId.set(entry.id, entry.bagId)
   }
 
-  const bucketMap = new Map()
+  const bucketOperatorUrlById = new Map()
   for (const entry of dataObligations.storageBuckets) {
-    bucketMap.set(entry.id, entry.operatorUrl)
+    // Skip all buckets of the current WorkerId (this storage provider)
+    if (entry.workerId !== currentWorkerId) {
+      bucketOperatorUrlById.set(entry.id, entry.operatorUrl)
+    }
   }
 
-  const bagMap = new Map()
+  const bagOperatorsUrlsById = new Map()
   for (const entry of dataObligations.bags) {
     const operatorUrls = []
 
     for (const bucket of entry.buckets) {
-      if (bucketMap.has(bucket)) {
-        const operatorUrl = bucketMap.get(bucket)
+      if (bucketOperatorUrlById.has(bucket)) {
+        const operatorUrl = bucketOperatorUrlById.get(bucket)
         if (operatorUrl) {
           operatorUrls.push(operatorUrl)
         }
       }
     }
 
-    bagMap.set(entry.id, operatorUrls)
+    bagOperatorsUrlsById.set(entry.id, operatorUrls)
   }
 
   const tasks = addedIds.map((id) => {
     let operatorUrls: string[] = [] // can be empty after look up
-    if (idMap.has(id)) {
-      const bagid = idMap.get(id)
-      if (bagMap.has(bagid)) {
-        operatorUrls = bagMap.get(bagid)
+    let bagId = null
+    if (bagIdByDataObjectId.has(id)) {
+      bagId = bagIdByDataObjectId.get(id)
+      if (bagOperatorsUrlsById.has(bagId)) {
+        operatorUrls = bagOperatorsUrlsById.get(bagId)
       }
     }
 
-    return new PrepareDownloadFileTask(operatorUrls, id, uploadDirectory, taskSink)
+    return new PrepareDownloadFileTask(operatorUrls, bagId, id, uploadDirectory, tempDirectory, taskSink, api)
   })
 
   return tasks
@@ -129,37 +148,17 @@ async function getPrepareDownloadTasks(
  * @param operatorUrl - defines the data source URL.
  * @param addedIds - data object IDs to download
  * @param uploadDirectory - local directory for data uploading
+ * @param tempDirectory - local directory for temporary data uploading
  */
 async function getDownloadTasks(
   operatorUrl: string,
   addedIds: string[],
-  uploadDirectory: string
+  uploadDirectory: string,
+  tempDirectory: string
 ): Promise<DownloadFileTask[]> {
-  const addedTasks = addedIds.map((fileName) => new DownloadFileTask(operatorUrl, fileName, uploadDirectory))
+  const addedTasks = addedIds.map(
+    (fileName) => new DownloadFileTask(operatorUrl, fileName, undefined, uploadDirectory, tempDirectory)
+  )
 
   return addedTasks
 }
-
-/**
- * Returns local data objects info.
- *
- * @param uploadDirectory - local directory to get file names from
- */
-export async function getLocalDataObjects(uploadDirectory: string, tempDirName: string): Promise<string[]> {
-  const localIds = await getLocalFileNames(uploadDirectory)
-
-  // Filter temporary directory name.
-  const tempDirectoryName = path.parse(tempDirName).name
-  const data = localIds.filter((dataObjectId) => dataObjectId !== tempDirectoryName)
-
-  return data
-}
-
-/**
- * Returns file names from the local directory.
- *
- * @param directory - local directory to get file names from
- */
-async function getLocalFileNames(directory: string): Promise<string[]> {
-  return fsPromises.readdir(directory)
-}

+ 102 - 21
storage-node-v2/src/services/sync/tasks.ts

@@ -9,6 +9,12 @@ import logger from '../../services/logger'
 import _ from 'lodash'
 import { getRemoteDataObjects } from './remoteStorageData'
 import { TaskSink } from './workingProcess'
+import { isNewDataObject } from '../caching/newUploads'
+import { addDataObjectIdToCache, deleteDataObjectIdFromCache } from '../caching/localDataObjects'
+import { hashFile } from '../helpers/hashing'
+import { parseBagId } from '../helpers/bagTypes'
+import { hexToString } from '@polkadot/util'
+import { ApiPromise } from '@polkadot/api'
 const fsPromises = fs.promises
 
 /**
@@ -43,8 +49,16 @@ export class DeleteLocalFileTask implements SyncTask {
   }
 
   async execute(): Promise<void> {
+    const dataObjectId = this.filename
+    if (isNewDataObject(dataObjectId)) {
+      logger.warn(`Sync - possible QueryNode update delay (new file) - deleting file canceled: ${this.filename}`)
+      return
+    }
+
     const fullPath = path.join(this.uploadsDirectory, this.filename)
-    return fsPromises.unlink(fullPath)
+    await fsPromises.unlink(fullPath)
+
+    await deleteDataObjectIdFromCache(dataObjectId)
   }
 }
 
@@ -52,14 +66,24 @@ export class DeleteLocalFileTask implements SyncTask {
  * Download the file from the remote storage node to the local storage.
  */
 export class DownloadFileTask implements SyncTask {
-  id: string
+  dataObjectId: string
+  expectedHash?: string
   uploadsDirectory: string
+  tempDirectory: string
   url: string
 
-  constructor(baseUrl: string, id: string, uploadsDirectory: string) {
-    this.id = id
+  constructor(
+    baseUrl: string,
+    dataObjectId: string,
+    expectedHash: string | undefined,
+    uploadsDirectory: string,
+    tempDirectory: string
+  ) {
+    this.dataObjectId = dataObjectId
+    this.expectedHash = expectedHash
     this.uploadsDirectory = uploadsDirectory
-    this.url = urljoin(baseUrl, 'api/v1/files', id)
+    this.tempDirectory = tempDirectory
+    this.url = urljoin(baseUrl, 'api/v1/files', dataObjectId)
   }
 
   description(): string {
@@ -68,28 +92,47 @@ export class DownloadFileTask implements SyncTask {
 
   async execute(): Promise<void> {
     const streamPipeline = promisify(pipeline)
-    const filepath = path.join(this.uploadsDirectory, this.id)
-
+    const filepath = path.join(this.uploadsDirectory, this.dataObjectId)
+    // We create tempfile first to mitigate partial downloads on app (or remote node) crash.
+    // This partial downloads will be cleaned up during the next sync iteration.
+    const tempFilePath = path.join(this.uploadsDirectory, this.tempDirectory, uuidv4())
     try {
       const timeoutMs = 30 * 60 * 1000 // 30 min for large files (~ 10 GB)
       // Casting because of:
       // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response
       const request = superagent.get(this.url).timeout(timeoutMs) as unknown as NodeJS.ReadableStream
-
-      // We create tempfile first to mitigate partial downloads on app (or remote node) crash.
-      // This partial downloads will be cleaned up during the next sync iteration.
-      const tempFilePath = path.join(this.uploadsDirectory, uuidv4())
       const fileStream = fs.createWriteStream(tempFilePath)
-      await streamPipeline(request, fileStream)
 
+      request.on('response', (res) => {
+        if (!res.ok) {
+          logger.error(`Sync - unexpected status code(${res.statusCode}) for ${res?.request?.url}`)
+        }
+      })
+      await streamPipeline(request, fileStream)
+      await this.verifyDownloadedFile(tempFilePath)
       await fsPromises.rename(tempFilePath, filepath)
+      await addDataObjectIdToCache(this.dataObjectId)
     } catch (err) {
-      logger.error(`Sync - fetching data error for ${this.url}: ${err}`)
+      logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err })
       try {
-        logger.warn(`Cleaning up file ${filepath}`)
-        await fs.unlinkSync(filepath)
+        logger.warn(`Cleaning up file ${tempFilePath}`)
+        await fsPromises.unlink(tempFilePath)
       } catch (err) {
-        logger.error(`Sync - cannot cleanup file ${filepath}: ${err}`)
+        logger.error(`Sync - cannot cleanup file ${tempFilePath}: ${err}`, { err })
+      }
+    }
+  }
+
+  /** Compares expected and real IPFS hashes
+   *
+   * @param filePath downloaded file path
+   */
+  async verifyDownloadedFile(filePath: string): Promise<void> {
+    if (!_.isEmpty(this.expectedHash)) {
+      const hash = await hashFile(filePath)
+
+      if (hash !== this.expectedHash) {
+        throw new Error(`Invalid file hash. Expected: ${this.expectedHash} - real: ${hash}`)
       }
     }
   }
@@ -99,16 +142,30 @@ export class DownloadFileTask implements SyncTask {
  * Resolve remote storage node URLs and creates file downloading tasks (DownloadFileTask).
  */
 export class PrepareDownloadFileTask implements SyncTask {
+  bagId: string
   dataObjectId: string
   operatorUrlCandidates: string[]
   taskSink: TaskSink
   uploadsDirectory: string
-
-  constructor(operatorUrlCandidates: string[], dataObjectId: string, uploadsDirectory: string, taskSink: TaskSink) {
+  tempDirectory: string
+  api?: ApiPromise
+
+  constructor(
+    operatorUrlCandidates: string[],
+    bagId: string,
+    dataObjectId: string,
+    uploadsDirectory: string,
+    tempDirectory: string,
+    taskSink: TaskSink,
+    api?: ApiPromise
+  ) {
+    this.api = api
+    this.bagId = bagId
     this.dataObjectId = dataObjectId
     this.taskSink = taskSink
     this.operatorUrlCandidates = operatorUrlCandidates
     this.uploadsDirectory = uploadsDirectory
+    this.tempDirectory = tempDirectory
   }
 
   description(): string {
@@ -121,6 +178,11 @@ export class PrepareDownloadFileTask implements SyncTask {
     // And cloning it seems like a heavy operation.
     const operatorUrlIndices: number[] = [...Array(this.operatorUrlCandidates.length).keys()]
 
+    if (_.isEmpty(this.bagId)) {
+      logger.error(`Sync - invalid task - no bagId for ${this.dataObjectId}`)
+      return
+    }
+
     while (!_.isEmpty(operatorUrlIndices)) {
       const randomUrlIndex = _.sample(operatorUrlIndices)
       if (randomUrlIndex === undefined) {
@@ -136,18 +198,37 @@ export class PrepareDownloadFileTask implements SyncTask {
 
       try {
         const chosenBaseUrl = randomUrl
-        const remoteOperatorIds: string[] = await getRemoteDataObjects(chosenBaseUrl)
+        const [remoteOperatorIds, hash] = await Promise.all([
+          getRemoteDataObjects(chosenBaseUrl),
+          this.getExpectedHash(),
+        ])
 
         if (remoteOperatorIds.includes(this.dataObjectId)) {
-          const newTask = new DownloadFileTask(chosenBaseUrl, this.dataObjectId, this.uploadsDirectory)
+          const newTask = new DownloadFileTask(
+            chosenBaseUrl,
+            this.dataObjectId,
+            hash,
+            this.uploadsDirectory,
+            this.tempDirectory
+          )
 
           return this.taskSink.add([newTask])
         }
       } catch (err) {
-        logger.error(`Sync - fetching data error for ${this.dataObjectId}: ${err}`)
+        logger.error(`Sync - fetching data error for ${this.dataObjectId}: ${err}`, { err })
       }
     }
 
     logger.warn(`Sync - cannot get operator URLs for ${this.dataObjectId}`)
   }
+
+  async getExpectedHash(): Promise<string | undefined> {
+    if (this.api !== undefined) {
+      const convertedBagId = parseBagId(this.bagId)
+      const dataObject = await this.api.query.storage.dataObjectsById(convertedBagId, this.dataObjectId)
+      return hexToString(dataObject.ipfsContentId.toString())
+    }
+
+    return undefined
+  }
 }

+ 4 - 2
storage-node-v2/src/services/sync/workingProcess.ts

@@ -68,10 +68,12 @@ export class WorkingStack implements TaskSink, TaskSource {
 export class TaskProcessor {
   taskSource: TaskSource
   exitOnCompletion: boolean
+  sleepTime: number
 
-  constructor(taskSource: TaskSource, exitOnCompletion = true) {
+  constructor(taskSource: TaskSource, exitOnCompletion = true, sleepTime = 3000) {
     this.taskSource = taskSource
     this.exitOnCompletion = exitOnCompletion
+    this.sleepTime = sleepTime
   }
 
   /**
@@ -93,7 +95,7 @@ export class TaskProcessor {
           return
         }
 
-        await sleep(3000)
+        await sleep(this.sleepTime)
       }
     }
   }

+ 6 - 69
storage-node-v2/src/services/webApi/app.ts

@@ -7,65 +7,9 @@ import { HttpError, OpenAPIV3, ValidateSecurityOpts } from 'express-openapi-vali
 import { KeyringPair } from '@polkadot/keyring/types'
 import { ApiPromise } from '@polkadot/api'
 import { RequestData, verifyTokenSignature, parseUploadToken, UploadToken } from '../helpers/auth'
-import { checkRemoveNonce } from '../../services/helpers/tokenNonceKeeper'
+import { checkRemoveNonce } from '../caching/tokenNonceKeeper'
 import { httpLogger, errorLogger } from '../../services/logger'
-
-/**
- * Web application parameters.
- */
-export type AppConfig = {
-  /**
-   * Runtime API promise
-   */
-  api: ApiPromise
-
-  /**
-   * KeyringPair instance
-   */
-  account: KeyringPair
-
-  /**
-   * Storage provider ID (worker ID)
-   */
-  workerId: number
-
-  /**
-   * Directory for the file uploading
-   */
-  uploadsDir: string
-  /**
-   * Directory within the `uploadsDir` for temporary file uploading
-   */
-  tempDirName: string
-
-  /**
-   *  Environment configuration
-   */
-  process: {
-    version: string
-    userAgent: string
-  }
-
-  /**
-   * Query Node endpoint URL
-   */
-  queryNodeUrl: string
-
-  /**
-   * Enables uploading auth-schema validation
-   */
-  enableUploadingAuth: boolean
-
-  /**
-   * ElasticSearch logging endpoint URL
-   */
-  elasticSearchEndpoint?: string
-
-  /**
-   * Max file size for uploading limit.
-   */
-  maxFileSize: number
-}
+import { AppConfig } from './controllers/common'
 
 /**
  * Creates Express web application. Uses the OAS spec file for the API.
@@ -75,8 +19,6 @@ export type AppConfig = {
  */
 export async function createApp(config: AppConfig): Promise<Express> {
   const spec = path.join(__dirname, './../../api-spec/openapi.yaml')
-  const tempFileUploadingDir = path.join(config.uploadsDir, config.tempDirName)
-
   const app = express()
 
   app.use(cors())
@@ -86,13 +28,8 @@ export async function createApp(config: AppConfig): Promise<Express> {
   app.use(
     // Set parameters for each request.
     (req: express.Request, res: express.Response, next: NextFunction) => {
-      res.locals.uploadsDir = config.uploadsDir
-      res.locals.tempFileUploadingDir = tempFileUploadingDir
-      res.locals.storageProviderAccount = config.account
-      res.locals.workerId = config.workerId
-      res.locals.api = config.api
-      res.locals.config = config.process
-      res.locals.queryNodeUrl = config.queryNodeUrl
+      res.locals = config
+
       next()
     },
     // Setup OpenAPiValidator
@@ -106,7 +43,7 @@ export async function createApp(config: AppConfig): Promise<Express> {
         resolver: OpenApiValidator.resolvers.modulePathResolver,
       },
       fileUploader: {
-        dest: tempFileUploadingDir,
+        dest: config.tempFileUploadingDir,
         // Busboy library settings
         limits: {
           // For multipart forms, the max number of file fields (Default: Infinity)
@@ -115,7 +52,7 @@ export async function createApp(config: AppConfig): Promise<Express> {
           fileSize: config.maxFileSize,
         },
       },
-      validateSecurity: setupUploadingValidation(config.enableUploadingAuth, config.api, config.account),
+      validateSecurity: setupUploadingValidation(config.enableUploadingAuth, config.api, config.storageProviderAccount),
     })
   ) // Required signature.
 

+ 70 - 13
storage-node-v2/src/services/webApi/controllers/common.ts

@@ -2,6 +2,9 @@ import * as express from 'express'
 import { CLIError } from '@oclif/errors'
 import { ExtrinsicFailedError } from '../../runtime/api'
 import { BagIdValidationError } from '../../helpers/bagTypes'
+import logger from '../../logger'
+import { ApiPromise } from '@polkadot/api'
+import { KeyringPair } from '@polkadot/keyring/types'
 
 /**
  * Dedicated error for the web api requests.
@@ -32,7 +35,7 @@ export class ServerError extends WebApiError {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getUploadsDir(res: express.Response): string {
+export function getUploadsDir(res: express.Response<unknown, AppConfig>): string {
   if (res.locals.uploadsDir) {
     return res.locals.uploadsDir
   }
@@ -47,7 +50,7 @@ export function getUploadsDir(res: express.Response): string {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getTempFileUploadingDir(res: express.Response): string {
+export function getTempFileUploadingDir(res: express.Response<unknown, AppConfig>): string {
   if (res.locals.tempFileUploadingDir) {
     return res.locals.tempFileUploadingDir
   }
@@ -62,7 +65,7 @@ export function getTempFileUploadingDir(res: express.Response): string {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getWorkerId(res: express.Response): number {
+export function getWorkerId(res: express.Response<unknown, AppConfig>): number {
   if (res.locals.workerId || res.locals.workerId === 0) {
     return res.locals.workerId
   }
@@ -77,9 +80,9 @@ export function getWorkerId(res: express.Response): number {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getQueryNodeUrl(res: express.Response): string {
-  if (res.locals.queryNodeUrl) {
-    return res.locals.queryNodeUrl
+export function getQueryNodeUrl(res: express.Response<unknown, AppConfig>): string {
+  if (res.locals.queryNodeEndpoint) {
+    return res.locals.queryNodeEndpoint
   }
 
   throw new ServerError('No Query Node URL loaded.')
@@ -92,12 +95,12 @@ export function getQueryNodeUrl(res: express.Response): string {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-export function getCommandConfig(res: express.Response): {
+export function getCommandConfig(res: express.Response<unknown, AppConfig>): {
   version: string
   userAgent: string
 } {
-  if (res.locals.config) {
-    return res.locals.config
+  if (res.locals.process) {
+    return res.locals.process
   }
 
   throw new ServerError('Cannot load command config.')
@@ -113,6 +116,7 @@ export function getCommandConfig(res: express.Response): {
  */
 export function sendResponseWithError(res: express.Response, err: Error, errorType: string): void {
   const message = isNofileError(err) ? `File not found.` : err.toString()
+  logger.error(message, { err })
 
   res.status(getHttpStatusCodeByError(err)).json({
     type: errorType,
@@ -149,13 +153,66 @@ export function getHttpStatusCodeByError(err: Error): number {
     return err.httpStatusCode
   }
 
-  if (err instanceof CLIError) {
-    return 400
-  }
-
   if (err instanceof BagIdValidationError) {
     return 400
   }
 
   return 500
 }
+
+/**
+ * Web application parameters.
+ */
+export type AppConfig = {
+  /**
+   * Runtime API promise
+   */
+  api: ApiPromise
+
+  /**
+   * KeyringPair instance
+   */
+  storageProviderAccount: KeyringPair
+
+  /**
+   * Storage provider ID (worker ID)
+   */
+  workerId: number
+
+  /**
+   * Directory for the file uploading
+   */
+  uploadsDir: string
+  /**
+   * Directory for temporary file uploading
+   */
+  tempFileUploadingDir: string
+
+  /**
+   *  Environment configuration
+   */
+  process: {
+    version: string
+    userAgent: string
+  }
+
+  /**
+   * Query Node endpoint URL
+   */
+  queryNodeEndpoint: string
+
+  /**
+   * Enables uploading auth-schema validation
+   */
+  enableUploadingAuth: boolean
+
+  /**
+   * ElasticSearch logging endpoint URL
+   */
+  elasticSearchEndpoint?: string
+
+  /**
+   * Max file size for uploading limit.
+   */
+  maxFileSize: number
+}

+ 24 - 14
storage-node-v2/src/services/webApi/controllers/publicApi.ts → storage-node-v2/src/services/webApi/controllers/filesApi.ts

@@ -6,11 +6,13 @@ import {
   createUploadToken,
   verifyTokenSignature,
 } from '../../helpers/auth'
-import { hashFile } from '../../../services/helpers/hashing'
-import { createNonce, getTokenExpirationTime } from '../../../services/helpers/tokenNonceKeeper'
-import { getFileInfo } from '../../../services/helpers/fileInfo'
+import { hashFile } from '../../helpers/hashing'
+import { registerNewDataObjectId } from '../../caching/newUploads'
+import { addDataObjectIdToCache } from '../../caching/localDataObjects'
+import { createNonce, getTokenExpirationTime } from '../../caching/tokenNonceKeeper'
+import { getFileInfo } from '../../helpers/fileInfo'
 import { BagId } from '@joystream/types/storage'
-import logger from '../../../services/logger'
+import logger from '../../logger'
 import { KeyringPair } from '@polkadot/keyring/types'
 import { ApiPromise } from '@polkadot/api'
 import * as express from 'express'
@@ -29,15 +31,16 @@ import {
   getCommandConfig,
   sendResponseWithError,
   getHttpStatusCodeByError,
+  AppConfig,
 } from './common'
-import { getStorageBucketIdsByWorkerId } from '../../../services/sync/storageObligations'
+import { getStorageBucketIdsByWorkerId } from '../../sync/storageObligations'
 import { Membership } from '@joystream/types/members'
 const fsPromises = fs.promises
 
 /**
  * A public endpoint: serves files by data object ID.
  */
-export async function getFile(req: express.Request, res: express.Response): Promise<void> {
+export async function getFile(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const dataObjectId = getDataObjectId(req)
     const uploadsDir = getUploadsDir(res)
@@ -68,7 +71,7 @@ export async function getFile(req: express.Request, res: express.Response): Prom
 /**
  * A public endpoint: sends file headers by data object ID.
  */
-export async function getFileHeaders(req: express.Request, res: express.Response): Promise<void> {
+export async function getFileHeaders(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const dataObjectId = getDataObjectId(req)
     const uploadsDir = getUploadsDir(res)
@@ -89,7 +92,7 @@ export async function getFileHeaders(req: express.Request, res: express.Response
 /**
  * A public endpoint: receives file.
  */
-export async function uploadFile(req: express.Request, res: express.Response): Promise<void> {
+export async function uploadFile(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   const uploadRequest: RequestData = req.body
 
   // saved filename to delete on verification or extrinsic errors
@@ -97,7 +100,6 @@ export async function uploadFile(req: express.Request, res: express.Response): P
   try {
     const fileObj = getFileObject(req)
     cleanupFileName = fileObj.path
-
     const queryNodeUrl = getQueryNodeUrl(res)
     const workerId = getWorkerId(res)
 
@@ -111,8 +113,12 @@ export async function uploadFile(req: express.Request, res: express.Response): P
     const accepted = await verifyDataObjectInfo(api, bagId, uploadRequest.dataObjectId, fileObj.size, hash)
 
     // Prepare new file name
+    const dataObjectId = uploadRequest.dataObjectId.toString()
     const uploadsDir = getUploadsDir(res)
-    const newPath = path.join(uploadsDir, uploadRequest.dataObjectId.toString())
+    const newPath = path.join(uploadsDir, dataObjectId)
+
+    registerNewDataObjectId(dataObjectId)
+    await addDataObjectIdToCache(dataObjectId)
 
     // Overwrites existing file.
     await fsPromises.rename(fileObj.path, newPath)
@@ -127,6 +133,7 @@ export async function uploadFile(req: express.Request, res: express.Response): P
         `Received already accepted data object. DataObjectId = ${uploadRequest.dataObjectId} WorkerId = ${workerId}`
       )
     }
+
     res.status(201).json({
       id: hash,
     })
@@ -140,7 +147,10 @@ export async function uploadFile(req: express.Request, res: express.Response): P
 /**
  * A public endpoint: creates auth token for file uploads.
  */
-export async function authTokenForUploading(req: express.Request, res: express.Response): Promise<void> {
+export async function authTokenForUploading(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>
+): Promise<void> {
   try {
     const account = getAccount(res)
     const tokenRequest = getTokenRequest(req)
@@ -190,7 +200,7 @@ function getFileObject(req: express.Request): Express.Multer.File {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-function getAccount(res: express.Response): KeyringPair {
+function getAccount(res: express.Response<unknown, AppConfig>): KeyringPair {
   if (res.locals.storageProviderAccount) {
     return res.locals.storageProviderAccount
   }
@@ -205,7 +215,7 @@ function getAccount(res: express.Response): KeyringPair {
  * This is a helper function. It parses the response object for a variable and
  * throws an error on failure.
  */
-function getApi(res: express.Response): ApiPromise {
+function getApi(res: express.Response<unknown, AppConfig>): ApiPromise {
   if (res.locals.api) {
     return res.locals.api
   }
@@ -327,7 +337,7 @@ async function cleanupFileOnError(cleanupFileName: string, error: string): Promi
 /**
  * A public endpoint: return the server version.
  */
-export async function getVersion(req: express.Request, res: express.Response): Promise<void> {
+export async function getVersion(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const config = getCommandConfig(res)
 

+ 17 - 29
storage-node-v2/src/services/webApi/controllers/stateApi.ts

@@ -1,4 +1,4 @@
-import { getLocalDataObjects } from '../../../services/sync/synchronizer'
+import { getDataObjectIDs } from '../../../services/caching/localDataObjects'
 import * as express from 'express'
 import _ from 'lodash'
 import { getDataObjectIDsByBagId } from '../../sync/storageObligations'
@@ -9,6 +9,7 @@ import {
   WebApiError,
   getCommandConfig,
   sendResponseWithError,
+  AppConfig,
 } from './common'
 import fastFolderSize from 'fast-folder-size'
 import { promisify } from 'util'
@@ -28,12 +29,12 @@ const dataCache = new NodeCache({
 /**
  * A public endpoint: return all local data objects.
  */
-export async function getAllLocalDataObjects(req: express.Request, res: express.Response): Promise<void> {
+export async function getAllLocalDataObjects(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>
+): Promise<void> {
   try {
-    const uploadsDir = getUploadsDir(res)
-    const tempFileDir = getTempFileUploadingDir(res)
-
-    const ids = await getCachedLocalDataObjects(uploadsDir, tempFileDir)
+    const ids = await getDataObjectIDs()
 
     res.status(200).json(ids)
   } catch (err) {
@@ -46,7 +47,10 @@ export async function getAllLocalDataObjects(req: express.Request, res: express.
  *
  *  @return total size and count of the data objects.
  */
-export async function getLocalDataStats(req: express.Request, res: express.Response): Promise<void> {
+export async function getLocalDataStats(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>
+): Promise<void> {
   try {
     const uploadsDir = getUploadsDir(res)
     const tempFileDir = getTempFileUploadingDir(res)
@@ -89,16 +93,16 @@ export async function getLocalDataStats(req: express.Request, res: express.Respo
 /**
  * A public endpoint: return local data objects for the bag.
  */
-export async function getLocalDataObjectsByBagId(req: express.Request, res: express.Response): Promise<void> {
+export async function getLocalDataObjectsByBagId(
+  req: express.Request,
+  res: express.Response<unknown, AppConfig>
+): Promise<void> {
   try {
-    const uploadsDir = getUploadsDir(res)
-    const tempFileDir = getTempFileUploadingDir(res)
-
     const queryNodeUrl = getQueryNodeUrl(res)
     const bagId = getBagId(req)
 
     const [ids, requiredIds] = await Promise.all([
-      getCachedLocalDataObjects(uploadsDir, tempFileDir),
+      getDataObjectIDs(),
       getCachedDataObjectsObligations(queryNodeUrl, bagId),
     ])
 
@@ -113,7 +117,7 @@ export async function getLocalDataObjectsByBagId(req: express.Request, res: expr
 /**
  * A public endpoint: return the server version.
  */
-export async function getVersion(req: express.Request, res: express.Response): Promise<void> {
+export async function getVersion(req: express.Request, res: express.Response<unknown, AppConfig>): Promise<void> {
   try {
     const config = getCommandConfig(res)
 
@@ -143,22 +147,6 @@ function getBagId(req: express.Request): string {
   throw new WebApiError('No bagId provided.', 400)
 }
 
-/**
- * Returns cached data objects IDs from the local data storage. Data could be
- * obsolete until cache expiration.
- *
- */
-async function getCachedLocalDataObjects(uploadsDir: string, tempDirName: string): Promise<string[]> {
-  const entryName = 'local_data_object'
-
-  if (!dataCache.has(entryName)) {
-    const data = await getLocalDataObjects(uploadsDir, tempDirName)
-
-    dataCache.set(entryName, data)
-  }
-  return dataCache.get(entryName) ?? []
-}
-
 /**
  * Returns cached data objects IDs from the local data storage. Data could be
  * obsolete until cache expiration.

Разница между файлами не показана из-за своего большого размера
+ 330 - 197
yarn.lock


Некоторые файлы не были показаны из-за большого количества измененных файлов