Переглянути джерело

storage-node: update for new runtime changes

Mokhtar Naamani 4 роки тому
батько
коміт
f961483dd5

+ 1 - 1
cli/src/Api.ts

@@ -49,7 +49,7 @@ import { Stake, StakeId } from '@joystream/types/stake'
 
 import { InputValidationLengthConstraint } from '@joystream/types/common'
 import { Class, ClassId, CuratorGroup, CuratorGroupId, Entity, EntityId } from '@joystream/types/content-directory'
-import { ContentId, DataObject } from '@joystream/types/media'
+import { ContentId, DataObject } from '@joystream/types/storage'
 import { ServiceProviderRecord, Url } from '@joystream/types/discovery'
 import _ from 'lodash'
 

+ 1 - 1
cli/src/commands/media/uploadVideo.ts

@@ -8,7 +8,7 @@ import { JsonSchemaPrompter } from '../../helpers/JsonSchemaPrompt'
 import { flags } from '@oclif/command'
 import fs from 'fs'
 import ExitCodes from '../../ExitCodes'
-import { ContentId } from '@joystream/types/media'
+import { ContentId } from '@joystream/types/storage'
 import ipfsHash from 'ipfs-only-hash'
 import { cli } from 'cli-ux'
 import axios, { AxiosRequestConfig } from 'axios'

+ 2 - 6
storage-node/packages/cli/src/commands/upload.ts

@@ -108,7 +108,7 @@ export class UploadCommand extends BaseCommand {
   // Creates the DataObject in the runtime.
   private async createContent(p: AddContentParams): Promise<DataObject> {
     try {
-      const dataObject: Option<DataObject> = await this.api.assets.createDataObject(
+      const dataObject: DataObject = await this.api.assets.createDataObject(
         p.accountId,
         p.memberId,
         p.contentId,
@@ -117,11 +117,7 @@ export class UploadCommand extends BaseCommand {
         p.ipfsCid
       )
 
-      if (dataObject.isNone) {
-        this.fail('Cannot create data object: got None object')
-      }
-
-      return dataObject.unwrap()
+      return dataObject
     } catch (err) {
       this.fail(`Cannot create data object: ${err}`)
     }

+ 2 - 2
storage-node/packages/colossus/bin/cli.js

@@ -149,11 +149,11 @@ function getStorage(runtimeApi, { ipfsHost }) {
     resolve_content_id: async (contentId) => {
       // Resolve via API
       const obj = await runtimeApi.assets.getDataObject(contentId)
-      if (!obj || obj.isNone) {
+      if (!obj) {
         return
       }
       // if obj.liaison_judgement !== Accepted .. throw ?
-      return obj.unwrap().ipfs_content_id.toString()
+      return obj.ipfs_content_id.toString()
     },
     ipfsHost,
   }

+ 1 - 1
storage-node/packages/colossus/lib/sync.js

@@ -20,7 +20,7 @@
 
 const debug = require('debug')('joystream:sync')
 const _ = require('lodash')
-const { ContentId } = require('@joystream/types/media')
+const { ContentId } = require('@joystream/types/storage')
 // The number of concurrent sync sessions allowed. Must be greater than zero.
 const MAX_CONCURRENT_SYNC_ITEMS = 20
 

+ 26 - 14
storage-node/packages/colossus/paths/asset/v0/{id}.js

@@ -27,6 +27,8 @@ function errorHandler(response, err, code) {
   response.status(err.code || code || 500).send({ message: err.toString() })
 }
 
+const PROCESS_UPLOAD_BALANCE = 3
+
 module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
   // Creat the IPFS HTTP Gateway proxy middleware
   const proxy = ipfsProxy.createProxy(storage, ipfsHttpGatewayUrl)
@@ -52,6 +54,16 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
         return
       }
 
+      // TODO: Do early filter on content_length..(do not wait for fileInfo)
+      // ensure it equals the data object claimed size, and less than max allowed by
+      // node policy.
+
+      // get content_length from request
+      // const fileSizesEqual = dataObject.size_in_bytes.eq(content_length)
+      // if (!fileSizesEqual) {
+      //   return res.status(403).send({ message: 'Upload size does not match expected size of content' })
+      // }
+
       const id = req.params.id // content id
 
       // First check if we're the liaison for the name, otherwise we can bail
@@ -60,15 +72,13 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
       const providerId = runtime.storageProviderId
       let dataObject
       try {
-        debug('calling checkLiaisonForDataObject')
         dataObject = await runtime.assets.checkLiaisonForDataObject(providerId, id)
-        debug('called checkLiaisonForDataObject')
       } catch (err) {
         errorHandler(res, err, 403)
         return
       }
 
-      const sufficientBalance = await runtime.providerHasMinimumBalance(3)
+      const sufficientBalance = await runtime.providerHasMinimumBalance(PROCESS_UPLOAD_BALANCE)
 
       if (!sufficientBalance) {
         errorHandler(res, 'Insufficient balance to process upload!', 503)
@@ -92,11 +102,13 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
           }
         }
 
+        // May occurs before entire stream is processed, at the end of stream
+        // or possibly never.
         stream.on('fileInfo', async (info) => {
           try {
             debug('Detected file info:', info)
 
-            // Filter
+            // Filter allowed content types
             const filterResult = filter({}, req.headers, info.mimeType)
             if (filterResult.code !== 200) {
               debug('Rejecting content', filterResult.message)
@@ -105,13 +117,13 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
 
               // Reject the content
               await runtime.assets.rejectContent(roleAddress, providerId, id)
-              return
-            }
-            debug('Content accepted.')
-            accepted = true
+            } else {
+              debug('Content accepted.')
+              accepted = true
 
-            // We may have to commit the stream.
-            possiblyCommit()
+              // We may have to commit the stream.
+              possiblyCommit()
+            }
           } catch (err) {
             errorHandler(res, err)
           }
@@ -161,12 +173,12 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
       }
     },
 
-    async get(req, res) {
-      proxy(req, res)
+    async get(req, res, next) {
+      proxy(req, res, next)
     },
 
-    async head(req, res) {
-      proxy(req, res)
+    async head(req, res, next) {
+      proxy(req, res, next)
     },
   }
 

+ 1 - 1
storage-node/packages/helios/bin/cli.js

@@ -48,7 +48,7 @@ async function assetRelationshipState(api, contentId, providers) {
     })
   )
 
-  return [activeRelationships.filter((active) => active).length, dataObject.unwrap().liaison_judgement]
+  return [activeRelationships.filter((active) => active).length, dataObject.liaison_judgement]
 }
 
 // HTTP HEAD with axios all known content ids on each provider

+ 9 - 3
storage-node/packages/runtime-api/assets.js

@@ -59,7 +59,8 @@ class AssetsApi {
     contentId = parseContentId(contentId)
     const storageSize = await this.base.api.query.dataDirectory.dataByContentId.size(contentId)
     if (!storageSize) {
-      throw new Error(`No DataObject found for content ID: ${contentId}`)
+      // throw new Error(`No DataObject found for content ID: ${contentId}`)
+      return null
     }
     return this.base.api.query.dataDirectory.dataByContentId(contentId)
   }
@@ -77,12 +78,16 @@ class AssetsApi {
 
     const obj = await this.getDataObject(contentId)
 
+    if (!obj) {
+      throw new Error(`No DataObject found for content ID: ${contentId}`)
+    }
+
     if (!obj.liaison.eq(storageProviderId)) {
       throw new Error(`This storage node is not liaison for the content ID: ${contentId}`)
     }
 
     if (obj.liaison_judgement.type !== 'Pending') {
-      throw new Error(`Expected Pending judgement, but found: ${obj.liaison_judgement.type}`)
+      throw new Error(`Content upload has already been processed.`)
     }
 
     return obj
@@ -155,7 +160,8 @@ class AssetsApi {
    * Returns array of all the content ids in storage
    */
   async getKnownContentIds() {
-    return this.base.api.query.dataDirectory.dataByContentId.keys().map(({ args: [contentId] }) => contentId)
+    const keys = await this.base.api.query.dataDirectory.dataByContentId.keys()
+    return keys.map(({ args: [contentId] }) => contentId)
   }
 }
 

+ 1 - 1
storage-node/packages/runtime-api/test/assets.js

@@ -31,7 +31,7 @@ describe('Assets', () => {
 
   it('returns DataObjects for a content ID', async () => {
     const obj = await api.assets.getDataObject('foo')
-    expect(obj.isNone).to.be.true
+    expect(obj).to.be.null
   })
 
   it('can check the liaison for a DataObject', async () => {

+ 1 - 1
storage-node/packages/storage/storage.js

@@ -102,7 +102,7 @@ class StorageWriteStream extends Transform {
     this.temp.write(chunk)
 
     // Try to detect file type during streaming.
-    if (!this.fileInfo && this.buf < fileType.minimumBytes) {
+    if (!this.fileInfo && this.buf.byteLength < fileType.minimumBytes) {
       this.buf = Buffer.concat([this.buf, chunk])
 
       if (this.buf >= fileType.minimumBytes) {

+ 1 - 1
tests/network-tests/src/Api.ts

@@ -34,7 +34,7 @@ import { ChannelEntity } from '@joystream/cd-schemas/types/entities/ChannelEntit
 import { VideoEntity } from '@joystream/cd-schemas/types/entities/VideoEntity'
 import { initializeContentDir, InputParser } from '@joystream/cd-schemas'
 import { OperationType } from '@joystream/types/content-directory'
-import { ContentId, DataObject } from '@joystream/types/media'
+import { ContentId, DataObject } from '@joystream/types/storage'
 import Debugger from 'debug'
 
 export enum WorkingGroups {

+ 1 - 1
tests/network-tests/src/flows/storageNode/getContentFromStorageNode.ts

@@ -1,6 +1,6 @@
 import axios from 'axios'
 import { assert } from 'chai'
-import { ContentId } from '@joystream/types/media'
+import { ContentId } from '@joystream/types/storage'
 import { registry } from '@joystream/types'
 
 import { FlowProps } from '../../Flow'